• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2018 Google, Inc.
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import logging
17import os
18import re
19import time
20import urllib.request
21
22from acts import asserts
23from acts import signals
24from acts import utils
25from acts.controllers import adb
26from acts.controllers.adb_lib.error import AdbError
27from acts.libs.proc import job
28from acts.utils import start_standing_subprocess
29from acts.utils import stop_standing_subprocess
30from acts_contrib.test_utils.net import connectivity_const as cconst
31from acts_contrib.test_utils.tel import tel_defines
32from acts_contrib.test_utils.tel.tel_data_utils import wait_for_cell_data_connection
33from acts_contrib.test_utils.tel.tel_test_utils import get_operator_name
34from acts_contrib.test_utils.tel.tel_test_utils import verify_http_connection
35
36from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
37
38VPN_CONST = cconst.VpnProfile
39VPN_TYPE = cconst.VpnProfileType
40VPN_PARAMS = cconst.VpnReqParams
41TCPDUMP_PATH = "/data/local/tmp/"
42USB_CHARGE_MODE = "svc usb setFunctions"
43USB_TETHERING_MODE = "svc usb setFunctions rndis"
44ENABLE_HARDWARE_OFFLOAD = "settings put global tether_offload_disabled 0"
45DISABLE_HARDWARE_OFFLOAD = "settings put global tether_offload_disabled 1"
46DEVICE_IP_ADDRESS = "ip address"
47LOCALHOST = "192.168.1.1"
48
49GCE_SSH = "gcloud compute ssh "
50GCE_SCP = "gcloud compute scp "
51
52
53def verify_lte_data_and_tethering_supported(ad):
54    """Verify if LTE data is enabled and tethering supported."""
55    wutils.wifi_toggle_state(ad, False)
56    ad.droid.telephonyToggleDataConnection(True)
57    wait_for_cell_data_connection(ad.log, ad, True)
58    asserts.assert_true(
59        verify_http_connection(ad.log, ad),
60        "HTTP verification failed on cell data connection")
61    asserts.assert_true(
62        ad.droid.connectivityIsTetheringSupported(),
63        "Tethering is not supported for the provider")
64    wutils.wifi_toggle_state(ad, True)
65
66
67def set_chrome_browser_permissions(ad):
68    """Set chrome browser start with no-first-run verification.
69
70    Give permission to read from and write to storage
71
72    Args:
73        ad: android device object
74    """
75    commands = ["pm grant com.android.chrome "
76                "android.permission.READ_EXTERNAL_STORAGE",
77                "pm grant com.android.chrome "
78                "android.permission.WRITE_EXTERNAL_STORAGE",
79                "rm /data/local/chrome-command-line",
80                "am set-debug-app --persistent com.android.chrome",
81                'echo "chrome --no-default-browser-check --no-first-run '
82                '--disable-fre" > /data/local/tmp/chrome-command-line']
83    for cmd in commands:
84        try:
85            ad.adb.shell(cmd)
86        except AdbError:
87            logging.warning("adb command %s failed on %s" % (cmd, ad.serial))
88
89
90def verify_ping_to_vpn_ip(ad, vpn_ping_addr):
91    """Verify if IP behind VPN server is pingable.
92
93    Ping should pass, if VPN is connected.
94    Ping should fail, if VPN is disconnected.
95
96    Args:
97        ad: android device object
98        vpn_ping_addr: target ping addr
99    """
100    ping_result = None
101    pkt_loss = "100% packet loss"
102    logging.info("Pinging: %s" % vpn_ping_addr)
103    try:
104        ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % vpn_ping_addr)
105    except AdbError:
106        pass
107    return ping_result and pkt_loss not in ping_result
108
109
110def legacy_vpn_connection_test_logic(ad, vpn_profile, vpn_ping_addr):
111    """Test logic for each legacy VPN connection.
112
113    Steps:
114      1. Generate profile for the VPN type
115      2. Establish connection to the server
116      3. Verify that connection is established using LegacyVpnInfo
117      4. Verify the connection by pinging the IP behind VPN
118      5. Stop the VPN connection
119      6. Check the connection status
120      7. Verify that ping to IP behind VPN fails
121
122    Args:
123        ad: Android device object
124        vpn_profile: object contains attribute for create vpn profile
125        vpn_ping_addr: addr to verify vpn connection
126    """
127    # Wait for sometime so that VPN server flushes all interfaces and
128    # connections after graceful termination
129    time.sleep(10)
130
131    ad.adb.shell("ip xfrm state flush")
132    ad.log.info("Connecting to: %s", vpn_profile)
133    ad.droid.vpnStartLegacyVpn(vpn_profile)
134    time.sleep(cconst.VPN_TIMEOUT)
135
136    connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo()
137    asserts.assert_equal(connected_vpn_info["state"],
138                         cconst.VPN_STATE_CONNECTED,
139                         "Unable to establish VPN connection for %s"
140                         % vpn_profile)
141
142    ping_result = verify_ping_to_vpn_ip(ad, vpn_ping_addr)
143    ip_xfrm_state = ad.adb.shell("ip xfrm state")
144    match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state)
145    if match_obj:
146        ip_xfrm_state = format(match_obj.group(0)).split()
147        ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0])
148
149    ad.droid.vpnStopLegacyVpn()
150    asserts.assert_true(ping_result,
151                        "Ping to the internal IP failed. "
152                        "Expected to pass as VPN is connected")
153
154    connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo()
155    asserts.assert_true(not connected_vpn_info,
156                        "Unable to terminate VPN connection for %s"
157                        % vpn_profile)
158
159
160def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr,
161                        ipsec_server_type, log_path):
162    """Download the certificates from VPN server and push to sdcard of DUT.
163
164    Args:
165      ad: android device object
166      vpn_params: vpn params from config file
167      vpn_type: 1 of the 6 VPN types
168      vpn_server_addr: server addr to connect to
169      ipsec_server_type: ipsec version - strongswan or openswan
170      log_path: log path to download cert
171
172    Returns:
173      Client cert file name on DUT's sdcard
174    """
175    url = "http://%s%s%s" % (vpn_server_addr,
176                             vpn_params['cert_path_vpnserver'],
177                             vpn_params['client_pkcs_file_name'])
178    logging.info("URL is: %s" % url)
179    if vpn_server_addr == LOCALHOST:
180        ad.droid.httpDownloadFile(url, "/sdcard/")
181        return vpn_params['client_pkcs_file_name']
182
183    local_cert_name = "%s_%s_%s" % (vpn_type.name,
184                                    ipsec_server_type,
185                                    vpn_params['client_pkcs_file_name'])
186    local_file_path = os.path.join(log_path, local_cert_name)
187    try:
188        ret = urllib.request.urlopen(url)
189        with open(local_file_path, "wb") as f:
190            f.write(ret.read())
191    except Exception:
192        asserts.fail("Unable to download certificate from the server")
193
194    ad.adb.push("%s sdcard/" % local_file_path)
195    return local_cert_name
196
197
198def generate_legacy_vpn_profile(ad,
199                                vpn_params,
200                                vpn_type,
201                                vpn_server_addr,
202                                ipsec_server_type,
203                                log_path):
204    """Generate legacy VPN profile for a VPN.
205
206    Args:
207      ad: android device object
208      vpn_params: vpn params from config file
209      vpn_type: 1 of the 6 VPN types
210      vpn_server_addr: server addr to connect to
211      ipsec_server_type: ipsec version - strongswan or openswan
212      log_path: log path to download cert
213
214    Returns:
215      Vpn profile
216    """
217    vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'],
218                   VPN_CONST.PWD: vpn_params['vpn_password'],
219                   VPN_CONST.TYPE: vpn_type.value,
220                   VPN_CONST.SERVER: vpn_server_addr, }
221    vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name,
222                                                  ipsec_server_type)
223    if vpn_type.name == "PPTP":
224        vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name
225
226    psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"])
227    rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"])
228
229    if vpn_type.name in psk_set:
230        vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret']
231    elif vpn_type.name in rsa_set:
232        cert_name = download_load_certs(ad,
233                                        vpn_params,
234                                        vpn_type,
235                                        vpn_server_addr,
236                                        ipsec_server_type,
237                                        log_path)
238        vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0]
239        ad.droid.installCertificate(vpn_profile, cert_name,
240                                    vpn_params['cert_password'])
241    else:
242        vpn_profile[VPN_CONST.MPPE] = "mppe"
243
244    return vpn_profile
245
246
247def generate_ikev2_vpn_profile(ad, vpn_params, vpn_type, server_addr, log_path):
248    """Generate VPN profile for IKEv2 VPN.
249
250    Args:
251        ad: android device object.
252        vpn_params: vpn params from config file.
253        vpn_type: ikev2 vpn type.
254        server_addr: vpn server addr.
255        log_path: log path to download cert.
256
257    Returns:
258        Vpn profile.
259    """
260    vpn_profile = {
261        VPN_CONST.TYPE: vpn_type.value,
262        VPN_CONST.SERVER: server_addr,
263    }
264
265    if vpn_type.name == "IKEV2_IPSEC_USER_PASS":
266        vpn_profile[VPN_CONST.USER] = vpn_params["vpn_username"]
267        vpn_profile[VPN_CONST.PWD] = vpn_params["vpn_password"]
268        vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"]
269        cert_name = download_load_certs(
270            ad, vpn_params, vpn_type, vpn_params["server_addr"],
271            "IKEV2_IPSEC_USER_PASS", log_path)
272        vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0]
273        ad.droid.installCertificate(
274            vpn_profile, cert_name, vpn_params['cert_password'])
275    elif vpn_type.name == "IKEV2_IPSEC_PSK":
276        vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"]
277        vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params["psk_secret"]
278    else:
279        vpn_profile[VPN_CONST.IPSEC_ID] = "%s@%s" % (
280            vpn_params["vpn_identity"], server_addr)
281        logging.info("ID: %s@%s" % (vpn_params["vpn_identity"], server_addr))
282        cert_name = download_load_certs(
283            ad, vpn_params, vpn_type, vpn_params["server_addr"],
284            "IKEV2_IPSEC_RSA", log_path)
285        vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0]
286        vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0]
287        ad.droid.installCertificate(
288            vpn_profile, cert_name, vpn_params['cert_password'])
289
290    return vpn_profile
291
292
293def start_tcpdump(ad, test_name, interface="any"):
294    """Start tcpdump on all interfaces.
295
296    Args:
297        ad: android device object.
298        test_name: tcpdump file name will have this
299    """
300    ad.log.info("Starting tcpdump on all interfaces")
301    ad.adb.shell("killall -9 tcpdump", ignore_status=True)
302    ad.adb.shell("mkdir %s" % TCPDUMP_PATH, ignore_status=True)
303    ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
304
305    file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name)
306    ad.log.info("tcpdump file is %s", file_name)
307    cmd = "adb -s {} shell tcpdump -i {} -s0 -w {}".format(ad.serial,
308                                                           interface, file_name)
309    try:
310        return start_standing_subprocess(cmd, 5)
311    except Exception:
312        ad.log.exception('Could not start standing process %s' % repr(cmd))
313
314    return None
315
316
317def stop_tcpdump(ad,
318                 proc,
319                 test_name,
320                 pull_dump=True,
321                 adb_pull_timeout=adb.DEFAULT_ADB_PULL_TIMEOUT):
322    """Stops tcpdump on any iface.
323
324       Pulls the tcpdump file in the tcpdump dir if necessary.
325
326    Args:
327        ad: android device object.
328        proc: need to know which pid to stop
329        test_name: test name to save the tcpdump file
330        pull_dump: pull tcpdump file or not
331        adb_pull_timeout: timeout for adb_pull
332
333    Returns:
334      log_path of the tcpdump file
335    """
336    ad.log.info("Stopping and pulling tcpdump if any")
337    if proc is None:
338        return None
339    try:
340        stop_standing_subprocess(proc)
341    except Exception as e:
342        ad.log.warning(e)
343    if pull_dump:
344        log_path = os.path.join(ad.device_log_path, "TCPDUMP_%s" % ad.serial)
345        os.makedirs(log_path, exist_ok=True)
346        ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path),
347                    timeout=adb_pull_timeout)
348        ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
349        file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name)
350        return "%s/%s" % (log_path, file_name)
351    return None
352
353
354def start_tcpdump_gce_server(ad, test_name, dest_port, gce):
355    """Start tcpdump on gce server.
356
357    Args:
358        ad: android device object
359        test_name: test case name
360        dest_port: port to collect tcpdump
361        gce: dictionary of gce instance
362
363    Returns:
364       process id and pcap file path from gce server
365    """
366    ad.log.info("Starting tcpdump on gce server")
367
368    # pcap file name
369    fname = "/tmp/%s_%s_%s_%s" % \
370        (test_name, ad.model, ad.serial,
371         time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(time.time())))
372
373    # start tcpdump
374    tcpdump_cmd = "sudo bash -c \'tcpdump -i %s -w %s.pcap port %s > \
375        %s.txt 2>&1 & echo $!\'" % (gce["interface"], fname, dest_port, fname)
376    gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \
377        (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"])
378    gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd)
379    utils.exe_cmd(gce_ssh_cmd)
380
381    # get process id
382    ps_cmd = '%s "ps aux | grep tcpdump | grep %s"' % (gcloud_ssh_cmd, fname)
383    tcpdump_pid = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore").split()
384    if not tcpdump_pid:
385        raise signals.TestFailure("Failed to start tcpdump on gce server")
386    return tcpdump_pid[1], fname
387
388
389def stop_tcpdump_gce_server(ad, tcpdump_pid, fname, gce):
390    """Stop and pull tcpdump file from gce server.
391
392    Args:
393        ad: android device object
394        tcpdump_pid: process id for tcpdump file
395        fname: tcpdump file path
396        gce: dictionary of gce instance
397
398    Returns:
399       pcap file from gce server
400    """
401    ad.log.info("Stop and pull pcap file from gce server")
402
403    # stop tcpdump
404    tcpdump_cmd = "sudo kill %s" % tcpdump_pid
405    gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \
406        (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"])
407    gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd)
408    utils.exe_cmd(gce_ssh_cmd)
409
410    # verify tcpdump is stopped
411    ps_cmd = '%s "ps aux | grep tcpdump"' % gcloud_ssh_cmd
412    res = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore")
413    if tcpdump_pid in res.split():
414        raise signals.TestFailure("Failed to stop tcpdump on gce server")
415    if not fname:
416        return None
417
418    # pull pcap file
419    gcloud_scp_cmd = "%s --project=%s --zone=%s %s@%s:" % \
420        (GCE_SCP, gce["project"], gce["zone"], gce["username"], gce["hostname"])
421    pull_file = '%s%s.pcap %s/' % (gcloud_scp_cmd, fname, ad.device_log_path)
422    utils.exe_cmd(pull_file)
423    if not os.path.exists(
424        "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])):
425        raise signals.TestFailure("Failed to pull tcpdump from gce server")
426
427    # delete pcaps
428    utils.exe_cmd('%s "sudo rm %s.*"' % (gcloud_ssh_cmd, fname))
429
430    # return pcap file
431    pcap_file = "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])
432    return pcap_file
433
434
435def is_ipaddress_ipv6(ip_address):
436    """Verify if the given string is a valid IPv6 address.
437
438    Args:
439        ip_address: string containing the IP address
440
441    Returns:
442        True: if valid ipv6 address
443        False: if not
444    """
445    try:
446        socket.inet_pton(socket.AF_INET6, ip_address)
447        return True
448    except socket.error:
449        return False
450
451
452def carrier_supports_ipv6(dut):
453    """Verify if carrier supports ipv6.
454
455    Args:
456        dut: Android device that is being checked
457
458    Returns:
459        True: if carrier supports ipv6
460        False: if not
461    """
462
463    carrier_supports_ipv6 = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"]
464    operator = get_operator_name("log", dut)
465    return operator in carrier_supports_ipv6
466
467
468def is_carrier_supports_entitlement(dut):
469    """Verify if carrier supports entitlement
470
471    Args:
472        dut: Android device
473
474    Return:
475        True if carrier supports entitlement, otherwise false
476    """
477
478    carriers_supports_entitlement = [
479        tel_defines.CARRIER_VZW,
480        tel_defines.CARRIER_ATT,
481        tel_defines.CARRIER_TMO,
482        tel_defines.CARRIER_SBM]
483    operator = get_operator_name("log", dut)
484    return operator in carriers_supports_entitlement
485
486
487def set_cap_net_raw_capability():
488    """Set the CAP_NET_RAW capability
489
490    To send the Scapy packets, we need to get the CAP_NET_RAW capability first.
491    """
492    cap_net_raw = "sudo setcap cap_net_raw=eip $(readlink -f $(which act.py))"
493    utils.exe_cmd(cap_net_raw)
494    cap_python = "sudo setcap cap_net_raw=eip $(readlink -f $(which python))"
495    utils.exe_cmd(cap_python)
496
497
498def supports_ipv6_tethering(self, dut):
499    """Check if provider supports IPv6 tethering.
500
501    Args:
502        dut: Android device that is being checked
503
504    Returns:
505        True: if provider supports IPv6 tethering
506        False: if not
507    """
508    carrier_supports_tethering = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"]
509    operator = get_operator_name(self.log, dut)
510    return operator in carrier_supports_tethering
511
512
513def start_usb_tethering(ad):
514    """Start USB tethering using #startTethering API.
515
516    Enable USB tethering by #startTethering API will break the RPC session,
517    Make sure you call #ad.recreate_services after call this API - b/149116235
518
519    Args:
520        ad: AndroidDevice object
521    """
522    ad.droid.connectivityStartTethering(tel_defines.TETHERING_USB, False)
523
524
525def stop_usb_tethering(ad):
526    """Stop USB tethering.
527
528    Args:
529        ad: android device object
530    """
531    ad.log.info("Stopping USB Tethering")
532    ad.stop_services()
533    ad.adb.shell(USB_CHARGE_MODE)
534    ad.adb.wait_for_device()
535    ad.start_services()
536
537
538def wait_for_new_iface(old_ifaces):
539    """Wait for the new interface to come up.
540
541    Args:
542        old_ifaces: list of old interfaces
543    """
544    old_set = set(old_ifaces)
545    # Try 10 times to find a new interface with a 1s sleep every time
546    # (equivalent to a 9s timeout)
547    for _ in range(0, 10):
548        new_ifaces = set(get_if_list()) - old_set
549        asserts.assert_true(len(new_ifaces) < 2,
550                            "Too many new interfaces after turning on "
551                            "tethering")
552        if len(new_ifaces) == 1:
553            # enable the new iface before return
554            new_iface = new_ifaces.pop()
555            enable_iface(new_iface)
556            return new_iface
557        time.sleep(1)
558    asserts.fail("Timeout waiting for tethering interface on host")
559
560
561def get_if_list():
562    """Returns a list containing all network interfaces.
563
564    The newest version of Scapy.get_if_list() returns the cached interfaces,
565    which might be out-dated, and unable to perceive the interface changes.
566    Use this method when need to monitoring the network interfaces changes.
567    Reference: https://github.com/secdev/scapy/pull/2707
568
569    Returns:
570        A list of the latest network interfaces. For example:
571        ['cvd-ebr', ..., 'eno1', 'enx4afa19a8dde1', 'lo', 'wlxd03745d68d88']
572    """
573    from scapy.config import conf
574    from scapy.compat import plain_str
575
576    # Get ifconfig output
577    result = job.run([conf.prog.ifconfig])
578    if result.exit_status:
579        raise asserts.fail(
580            "Failed to execute ifconfig: {}".format(plain_str(result.stderr)))
581
582    interfaces = [
583        line[:line.find(':')] for line in plain_str(result.stdout).splitlines()
584        if ": flags" in line.lower()
585    ]
586    return interfaces
587
588
589def enable_hardware_offload(ad):
590    """Enable hardware offload using adb shell command.
591
592    Args:
593        ad: Android device object
594    """
595    ad.log.info("Enabling hardware offload.")
596    ad.adb.shell(ENABLE_HARDWARE_OFFLOAD, ignore_status=True)
597    ad.reboot()
598    time.sleep(tel_defines.WAIT_TIME_AFTER_REBOOT)
599
600
601def disable_hardware_offload(ad):
602    """Disable hardware offload using adb shell command.
603
604    Args:
605        ad: Android device object
606    """
607    ad.log.info("Disabling hardware offload.")
608    ad.adb.shell(DISABLE_HARDWARE_OFFLOAD, ignore_status=True)
609    ad.reboot()
610    time.sleep(tel_defines.WAIT_TIME_AFTER_REBOOT)
611
612
613def enable_iface(iface):
614    """Enable network interfaces.
615
616    Some network interface might disabled as default, need to enable before
617    using it.
618
619    Args:
620        iface: network interface that need to enable
621    """
622    from scapy.compat import plain_str
623
624    result = job.run("sudo ifconfig %s up" % (iface), ignore_status=True)
625    if result.exit_status:
626        raise asserts.fail(
627            "Failed to execute ifconfig: {}".format(plain_str(result.stderr)))
628