• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2018 Google, Inc.
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16import logging
17import os
18import re
19import time
20import urllib.request
21
22from acts import asserts
23from acts import signals
24from acts import utils
25from acts.controllers import adb
26from acts.controllers.adb_lib.error import AdbError
27from acts.libs.proc import job
28from acts.utils import start_standing_subprocess
29from acts.utils import stop_standing_subprocess
30from acts_contrib.test_utils.net import connectivity_const as cconst
31from acts_contrib.test_utils.tel import tel_defines
32from acts_contrib.test_utils.tel.tel_data_utils import wait_for_cell_data_connection
33from acts_contrib.test_utils.tel.tel_test_utils import get_operator_name
34from acts_contrib.test_utils.tel.tel_test_utils import verify_http_connection
35
36from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
37from scapy.config import conf
38from scapy.compat import plain_str
39
40VPN_CONST = cconst.VpnProfile
41VPN_TYPE = cconst.VpnProfileType
42VPN_PARAMS = cconst.VpnReqParams
43TCPDUMP_PATH = "/data/local/tmp/"
44USB_CHARGE_MODE = "svc usb setFunctions"
45USB_TETHERING_MODE = "svc usb setFunctions rndis"
46ENABLE_HARDWARE_OFFLOAD = "settings put global tether_offload_disabled 0"
47DISABLE_HARDWARE_OFFLOAD = "settings put global tether_offload_disabled 1"
48DEVICE_IP_ADDRESS = "ip address"
49LOCALHOST = "192.168.1.1"
50
51GCE_SSH = "gcloud compute ssh "
52GCE_SCP = "gcloud compute scp "
53
54
55def verify_lte_data_and_tethering_supported(ad):
56    """Verify if LTE data is enabled and tethering supported."""
57    wutils.wifi_toggle_state(ad, False)
58    ad.droid.telephonyToggleDataConnection(True)
59    wait_for_cell_data_connection(ad.log, ad, True)
60    asserts.assert_true(
61        verify_http_connection(ad.log, ad),
62        "HTTP verification failed on cell data connection")
63    asserts.assert_true(
64        ad.droid.connectivityIsTetheringSupported(),
65        "Tethering is not supported for the provider")
66    wutils.wifi_toggle_state(ad, True)
67
68
69def set_chrome_browser_permissions(ad):
70    """Set chrome browser start with no-first-run verification.
71
72    Give permission to read from and write to storage
73
74    Args:
75        ad: android device object
76    """
77    commands = ["pm grant com.android.chrome "
78                "android.permission.READ_EXTERNAL_STORAGE",
79                "pm grant com.android.chrome "
80                "android.permission.WRITE_EXTERNAL_STORAGE",
81                "rm /data/local/chrome-command-line",
82                "am set-debug-app --persistent com.android.chrome",
83                'echo "chrome --no-default-browser-check --no-first-run '
84                '--disable-fre" > /data/local/tmp/chrome-command-line']
85    for cmd in commands:
86        try:
87            ad.adb.shell(cmd)
88        except AdbError:
89            logging.warning("adb command %s failed on %s" % (cmd, ad.serial))
90
91
92def verify_ping_to_vpn_ip(ad, vpn_ping_addr):
93    """Verify if IP behind VPN server is pingable.
94
95    Ping should pass, if VPN is connected.
96    Ping should fail, if VPN is disconnected.
97
98    Args:
99        ad: android device object
100        vpn_ping_addr: target ping addr
101    """
102    ping_result = None
103    pkt_loss = "100% packet loss"
104    logging.info("Pinging: %s" % vpn_ping_addr)
105    try:
106        ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % vpn_ping_addr)
107    except AdbError:
108        pass
109    return ping_result and pkt_loss not in ping_result
110
111
112def legacy_vpn_connection_test_logic(ad, vpn_profile, vpn_ping_addr):
113    """Test logic for each legacy VPN connection.
114
115    Steps:
116      1. Generate profile for the VPN type
117      2. Establish connection to the server
118      3. Verify that connection is established using LegacyVpnInfo
119      4. Verify the connection by pinging the IP behind VPN
120      5. Stop the VPN connection
121      6. Check the connection status
122      7. Verify that ping to IP behind VPN fails
123
124    Args:
125        ad: Android device object
126        vpn_profile: object contains attribute for create vpn profile
127        vpn_ping_addr: addr to verify vpn connection
128    """
129    # Wait for sometime so that VPN server flushes all interfaces and
130    # connections after graceful termination
131    time.sleep(10)
132
133    ad.adb.shell("ip xfrm state flush")
134    ad.log.info("Connecting to: %s", vpn_profile)
135    ad.droid.vpnStartLegacyVpn(vpn_profile)
136    time.sleep(cconst.VPN_TIMEOUT)
137
138    connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo()
139    asserts.assert_equal(connected_vpn_info["state"],
140                         cconst.VPN_STATE_CONNECTED,
141                         "Unable to establish VPN connection for %s"
142                         % vpn_profile)
143
144    ping_result = verify_ping_to_vpn_ip(ad, vpn_ping_addr)
145    ip_xfrm_state = ad.adb.shell("ip xfrm state")
146    match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state)
147    if match_obj:
148        ip_xfrm_state = format(match_obj.group(0)).split()
149        ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0])
150
151    ad.droid.vpnStopLegacyVpn()
152    asserts.assert_true(ping_result,
153                        "Ping to the internal IP failed. "
154                        "Expected to pass as VPN is connected")
155
156    connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo()
157    asserts.assert_true(not connected_vpn_info,
158                        "Unable to terminate VPN connection for %s"
159                        % vpn_profile)
160
161
162def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr,
163                        ipsec_server_type, log_path):
164    """Download the certificates from VPN server and push to sdcard of DUT.
165
166    Args:
167      ad: android device object
168      vpn_params: vpn params from config file
169      vpn_type: 1 of the 6 VPN types
170      vpn_server_addr: server addr to connect to
171      ipsec_server_type: ipsec version - strongswan or openswan
172      log_path: log path to download cert
173
174    Returns:
175      Client cert file name on DUT's sdcard
176    """
177    url = "http://%s%s%s" % (vpn_server_addr,
178                             vpn_params['cert_path_vpnserver'],
179                             vpn_params['client_pkcs_file_name'])
180    logging.info("URL is: %s" % url)
181    if vpn_server_addr == LOCALHOST:
182        ad.droid.httpDownloadFile(url, "/sdcard/")
183        return vpn_params['client_pkcs_file_name']
184
185    local_cert_name = "%s_%s_%s" % (vpn_type.name,
186                                    ipsec_server_type,
187                                    vpn_params['client_pkcs_file_name'])
188    local_file_path = os.path.join(log_path, local_cert_name)
189    try:
190        ret = urllib.request.urlopen(url)
191        with open(local_file_path, "wb") as f:
192            f.write(ret.read())
193    except Exception:
194        asserts.fail("Unable to download certificate from the server")
195
196    ad.adb.push("%s sdcard/" % local_file_path)
197    return local_cert_name
198
199
200def generate_legacy_vpn_profile(ad,
201                                vpn_params,
202                                vpn_type,
203                                vpn_server_addr,
204                                ipsec_server_type,
205                                log_path):
206    """Generate legacy VPN profile for a VPN.
207
208    Args:
209      ad: android device object
210      vpn_params: vpn params from config file
211      vpn_type: 1 of the 6 VPN types
212      vpn_server_addr: server addr to connect to
213      ipsec_server_type: ipsec version - strongswan or openswan
214      log_path: log path to download cert
215
216    Returns:
217      Vpn profile
218    """
219    vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'],
220                   VPN_CONST.PWD: vpn_params['vpn_password'],
221                   VPN_CONST.TYPE: vpn_type.value,
222                   VPN_CONST.SERVER: vpn_server_addr, }
223    vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name,
224                                                  ipsec_server_type)
225    if vpn_type.name == "PPTP":
226        vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name
227
228    psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"])
229    rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"])
230
231    if vpn_type.name in psk_set:
232        vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret']
233    elif vpn_type.name in rsa_set:
234        cert_name = download_load_certs(ad,
235                                        vpn_params,
236                                        vpn_type,
237                                        vpn_server_addr,
238                                        ipsec_server_type,
239                                        log_path)
240        vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0]
241        ad.droid.installCertificate(vpn_profile, cert_name,
242                                    vpn_params['cert_password'])
243    else:
244        vpn_profile[VPN_CONST.MPPE] = "mppe"
245
246    return vpn_profile
247
248
249def generate_ikev2_vpn_profile(ad, vpn_params, vpn_type, server_addr, log_path):
250    """Generate VPN profile for IKEv2 VPN.
251
252    Args:
253        ad: android device object.
254        vpn_params: vpn params from config file.
255        vpn_type: ikev2 vpn type.
256        server_addr: vpn server addr.
257        log_path: log path to download cert.
258
259    Returns:
260        Vpn profile.
261    """
262    vpn_profile = {
263        VPN_CONST.TYPE: vpn_type.value,
264        VPN_CONST.SERVER: server_addr,
265    }
266
267    if vpn_type.name == "IKEV2_IPSEC_USER_PASS":
268        vpn_profile[VPN_CONST.USER] = vpn_params["vpn_username"]
269        vpn_profile[VPN_CONST.PWD] = vpn_params["vpn_password"]
270        vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"]
271        cert_name = download_load_certs(
272            ad, vpn_params, vpn_type, vpn_params["server_addr"],
273            "IKEV2_IPSEC_USER_PASS", log_path)
274        vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0]
275        ad.droid.installCertificate(
276            vpn_profile, cert_name, vpn_params['cert_password'])
277    elif vpn_type.name == "IKEV2_IPSEC_PSK":
278        vpn_profile[VPN_CONST.IPSEC_ID] = vpn_params["vpn_identity"]
279        vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params["psk_secret"]
280    else:
281        vpn_profile[VPN_CONST.IPSEC_ID] = "%s@%s" % (
282            vpn_params["vpn_identity"], server_addr)
283        logging.info("ID: %s@%s" % (vpn_params["vpn_identity"], server_addr))
284        cert_name = download_load_certs(
285            ad, vpn_params, vpn_type, vpn_params["server_addr"],
286            "IKEV2_IPSEC_RSA", log_path)
287        vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0]
288        vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0]
289        ad.droid.installCertificate(
290            vpn_profile, cert_name, vpn_params['cert_password'])
291
292    return vpn_profile
293
294
295def start_tcpdump(ad, test_name, interface="any"):
296    """Start tcpdump on all interfaces.
297
298    Args:
299        ad: android device object.
300        test_name: tcpdump file name will have this
301    """
302    ad.log.info("Starting tcpdump on all interfaces")
303    ad.adb.shell("killall -9 tcpdump", ignore_status=True)
304    ad.adb.shell("mkdir %s" % TCPDUMP_PATH, ignore_status=True)
305    ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
306
307    file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name)
308    ad.log.info("tcpdump file is %s", file_name)
309    cmd = "adb -s {} shell tcpdump -i {} -s0 -w {}".format(ad.serial,
310                                                           interface, file_name)
311    try:
312        return start_standing_subprocess(cmd, 5)
313    except Exception:
314        ad.log.exception('Could not start standing process %s' % repr(cmd))
315
316    return None
317
318
319def stop_tcpdump(ad,
320                 proc,
321                 test_name,
322                 pull_dump=True,
323                 adb_pull_timeout=adb.DEFAULT_ADB_PULL_TIMEOUT):
324    """Stops tcpdump on any iface.
325
326       Pulls the tcpdump file in the tcpdump dir if necessary.
327
328    Args:
329        ad: android device object.
330        proc: need to know which pid to stop
331        test_name: test name to save the tcpdump file
332        pull_dump: pull tcpdump file or not
333        adb_pull_timeout: timeout for adb_pull
334
335    Returns:
336      log_path of the tcpdump file
337    """
338    ad.log.info("Stopping and pulling tcpdump if any")
339    if proc is None:
340        return None
341    try:
342        stop_standing_subprocess(proc)
343    except Exception as e:
344        ad.log.warning(e)
345    if pull_dump:
346        log_path = os.path.join(ad.device_log_path, "TCPDUMP_%s" % ad.serial)
347        os.makedirs(log_path, exist_ok=True)
348        ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path),
349                    timeout=adb_pull_timeout)
350        ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
351        file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name)
352        return "%s/%s" % (log_path, file_name)
353    return None
354
355
356def start_tcpdump_gce_server(ad, test_name, dest_port, gce):
357    """Start tcpdump on gce server.
358
359    Args:
360        ad: android device object
361        test_name: test case name
362        dest_port: port to collect tcpdump
363        gce: dictionary of gce instance
364
365    Returns:
366       process id and pcap file path from gce server
367    """
368    ad.log.info("Starting tcpdump on gce server")
369
370    # pcap file name
371    fname = "/tmp/%s_%s_%s_%s" % \
372        (test_name, ad.model, ad.serial,
373         time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(time.time())))
374
375    # start tcpdump
376    tcpdump_cmd = "sudo bash -c \'tcpdump -i %s -w %s.pcap port %s > \
377        %s.txt 2>&1 & echo $!\'" % (gce["interface"], fname, dest_port, fname)
378    gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \
379        (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"])
380    gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd)
381    utils.exe_cmd(gce_ssh_cmd)
382
383    # get process id
384    ps_cmd = '%s "ps aux | grep tcpdump | grep %s"' % (gcloud_ssh_cmd, fname)
385    tcpdump_pid = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore").split()
386    if not tcpdump_pid:
387        raise signals.TestFailure("Failed to start tcpdump on gce server")
388    return tcpdump_pid[1], fname
389
390
391def stop_tcpdump_gce_server(ad, tcpdump_pid, fname, gce):
392    """Stop and pull tcpdump file from gce server.
393
394    Args:
395        ad: android device object
396        tcpdump_pid: process id for tcpdump file
397        fname: tcpdump file path
398        gce: dictionary of gce instance
399
400    Returns:
401       pcap file from gce server
402    """
403    ad.log.info("Stop and pull pcap file from gce server")
404
405    # stop tcpdump
406    tcpdump_cmd = "sudo kill %s" % tcpdump_pid
407    gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \
408        (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"])
409    gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd)
410    utils.exe_cmd(gce_ssh_cmd)
411
412    # verify tcpdump is stopped
413    ps_cmd = '%s "ps aux | grep tcpdump"' % gcloud_ssh_cmd
414    res = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore")
415    if tcpdump_pid in res.split():
416        raise signals.TestFailure("Failed to stop tcpdump on gce server")
417    if not fname:
418        return None
419
420    # pull pcap file
421    gcloud_scp_cmd = "%s --project=%s --zone=%s %s@%s:" % \
422        (GCE_SCP, gce["project"], gce["zone"], gce["username"], gce["hostname"])
423    pull_file = '%s%s.pcap %s/' % (gcloud_scp_cmd, fname, ad.device_log_path)
424    utils.exe_cmd(pull_file)
425    if not os.path.exists(
426        "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])):
427        raise signals.TestFailure("Failed to pull tcpdump from gce server")
428
429    # delete pcaps
430    utils.exe_cmd('%s "sudo rm %s.*"' % (gcloud_ssh_cmd, fname))
431
432    # return pcap file
433    pcap_file = "%s/%s.pcap" % (ad.device_log_path, fname.split('/')[-1])
434    return pcap_file
435
436
437def is_ipaddress_ipv6(ip_address):
438    """Verify if the given string is a valid IPv6 address.
439
440    Args:
441        ip_address: string containing the IP address
442
443    Returns:
444        True: if valid ipv6 address
445        False: if not
446    """
447    try:
448        socket.inet_pton(socket.AF_INET6, ip_address)
449        return True
450    except socket.error:
451        return False
452
453
454def carrier_supports_ipv6(dut):
455    """Verify if carrier supports ipv6.
456
457    Args:
458        dut: Android device that is being checked
459
460    Returns:
461        True: if carrier supports ipv6
462        False: if not
463    """
464
465    carrier_supports_ipv6 = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"]
466    operator = get_operator_name("log", dut)
467    return operator in carrier_supports_ipv6
468
469
470def is_carrier_supports_entitlement(dut):
471    """Verify if carrier supports entitlement
472
473    Args:
474        dut: Android device
475
476    Return:
477        True if carrier supports entitlement, otherwise false
478    """
479
480    carriers_supports_entitlement = [
481        tel_defines.CARRIER_VZW,
482        tel_defines.CARRIER_ATT,
483        tel_defines.CARRIER_TMO,
484        tel_defines.CARRIER_SBM]
485    operator = get_operator_name("log", dut)
486    return operator in carriers_supports_entitlement
487
488
489def set_cap_net_raw_capability():
490    """Set the CAP_NET_RAW capability
491
492    To send the Scapy packets, we need to get the CAP_NET_RAW capability first.
493    """
494    cap_net_raw = "sudo setcap cap_net_raw=eip $(readlink -f $(which act.py))"
495    utils.exe_cmd(cap_net_raw)
496    cap_python = "sudo setcap cap_net_raw=eip $(readlink -f $(which python))"
497    utils.exe_cmd(cap_python)
498
499
500def supports_ipv6_tethering(self, dut):
501    """Check if provider supports IPv6 tethering.
502
503    Args:
504        dut: Android device that is being checked
505
506    Returns:
507        True: if provider supports IPv6 tethering
508        False: if not
509    """
510    carrier_supports_tethering = ["vzw", "tmo", "Far EasTone", "Chunghwa Telecom"]
511    operator = get_operator_name(self.log, dut)
512    return operator in carrier_supports_tethering
513
514
515def start_usb_tethering(ad):
516    """Start USB tethering using #startTethering API.
517
518    Enable USB tethering by #startTethering API will break the RPC session,
519    Make sure you call #ad.recreate_services after call this API - b/149116235
520
521    Args:
522        ad: AndroidDevice object
523    """
524    ad.droid.connectivityStartTethering(tel_defines.TETHERING_USB, False)
525
526
527def stop_usb_tethering(ad):
528    """Stop USB tethering.
529
530    Args:
531        ad: android device object
532    """
533    ad.log.info("Stopping USB Tethering")
534    ad.stop_services()
535    ad.adb.shell(USB_CHARGE_MODE)
536    ad.adb.wait_for_device()
537    ad.start_services()
538
539
540def wait_for_new_iface(old_ifaces):
541    """Wait for the new interface to come up.
542
543    Args:
544        old_ifaces: list of old interfaces
545    """
546    old_set = set(old_ifaces)
547    # Try 10 times to find a new interface with a 1s sleep every time
548    # (equivalent to a 9s timeout)
549    for _ in range(0, 10):
550        new_ifaces = set(get_if_list()) - old_set
551        asserts.assert_true(len(new_ifaces) < 2,
552                            "Too many new interfaces after turning on "
553                            "tethering")
554        if len(new_ifaces) == 1:
555            # enable the new iface before return
556            new_iface = new_ifaces.pop()
557            enable_iface(new_iface)
558            return new_iface
559        time.sleep(1)
560    asserts.fail("Timeout waiting for tethering interface on host")
561
562
563def get_if_list():
564    """Returns a list containing all network interfaces.
565
566    The newest version of Scapy.get_if_list() returns the cached interfaces,
567    which might be out-dated, and unable to perceive the interface changes.
568    Use this method when need to monitoring the network interfaces changes.
569    Reference: https://github.com/secdev/scapy/pull/2707
570
571    Returns:
572        A list of the latest network interfaces. For example:
573        ['cvd-ebr', ..., 'eno1', 'enx4afa19a8dde1', 'lo', 'wlxd03745d68d88']
574    """
575
576    # Get ifconfig output
577    result = job.run([conf.prog.ifconfig])
578    if result.exit_status:
579        raise asserts.fail(
580            "Failed to execute ifconfig: {}".format(plain_str(result.stderr)))
581
582    interfaces = [
583        line[:line.find(':')] for line in plain_str(result.stdout).splitlines()
584        if ": flags" in line.lower()
585    ]
586    return interfaces
587
588
589def enable_hardware_offload(ad):
590    """Enable hardware offload using adb shell command.
591
592    Args:
593        ad: Android device object
594    """
595    ad.log.info("Enabling hardware offload.")
596    ad.adb.shell(ENABLE_HARDWARE_OFFLOAD, ignore_status=True)
597    ad.reboot()
598    time.sleep(tel_defines.WAIT_TIME_AFTER_REBOOT)
599
600
601def disable_hardware_offload(ad):
602    """Disable hardware offload using adb shell command.
603
604    Args:
605        ad: Android device object
606    """
607    ad.log.info("Disabling hardware offload.")
608    ad.adb.shell(DISABLE_HARDWARE_OFFLOAD, ignore_status=True)
609    ad.reboot()
610    time.sleep(tel_defines.WAIT_TIME_AFTER_REBOOT)
611
612
613def enable_iface(iface):
614    """Enable network interfaces.
615
616    Some network interface might disabled as default, need to enable before
617    using it.
618
619    Args:
620        iface: network interface that need to enable
621    """
622    result = job.run("sudo ifconfig %s up" % (iface), ignore_status=True)
623    if result.exit_status:
624        raise asserts.fail(
625            "Failed to execute ifconfig: {}".format(plain_str(result.stderr)))
626