• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5
6from autotest_lib.client.bin import test
7from autotest_lib.client.common_lib import error
8from autotest_lib.client.common_lib import utils
9from autotest_lib.client.common_lib.cros import site_eap_certs
10from autotest_lib.client.common_lib.cros import virtual_ethernet_pair
11from autotest_lib.client.cros import certificate_util
12from autotest_lib.client.cros import shill_temporary_profile
13from autotest_lib.client.cros import tpm_store
14from autotest_lib.client.cros import vpn_server
15from autotest_lib.client.cros.networking import shill_proxy
16
17class network_VPNConnect(test.test):
18    """The VPN authentication class.
19
20    Starts up a VPN server within a chroot on the other end of a virtual
21    ethernet pair and attempts a VPN association using shill.
22
23    """
24    CLIENT_INTERFACE_NAME = 'pseudoethernet0'
25    SERVER_INTERFACE_NAME = 'serverethernet0'
26    TEST_PROFILE_NAME = 'testVPN'
27    CONNECT_TIMEOUT_SECONDS = 15
28    version = 1
29    SERVER_ADDRESS = '10.9.8.1'
30    CLIENT_ADDRESS = '10.9.8.2'
31    NETWORK_PREFIX = 24
32
33    def get_device(self, interface_name):
34        """Finds the corresponding Device object for an ethernet
35        interface with the name |interface_name|.
36
37        @param interface_name string The name of the interface to check.
38
39        @return DBus interface object representing the associated device.
40
41        """
42        device = self._shill_proxy.find_object('Device',
43                                               {'Name': interface_name})
44        if device is None:
45            raise error.TestFail('Device was not found.')
46
47        return device
48
49
50    def find_ethernet_service(self, interface_name):
51        """Finds the corresponding service object for an ethernet
52        interface.
53
54        @param interface_name string The name of the associated interface
55
56        @return Service object representing the associated service.
57
58        """
59        device = self.get_device(interface_name)
60        device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path)
61        return self._shill_proxy.find_object('Service', {'Device': device_path})
62
63
64    def configure_static_ip(self, interface_name, address, prefix_len):
65        """Configures the Static IP parameters for the Ethernet interface
66        |interface_name| and applies those parameters to the interface by
67        forcing a re-connect.
68
69        @param interface_name string The name of the associated interface.
70        @param address string the IP address this interface should have.
71        @param prefix_len string the IP address prefix for the interface.
72
73        """
74        service = self.find_ethernet_service(interface_name)
75        service.SetProperty('StaticIP.Address', address)
76        service.SetProperty('StaticIP.Prefixlen', prefix_len)
77        service.Disconnect()
78        service.Connect()
79
80
81    def get_vpn_server(self):
82        """Returns a VPN server instance."""
83        if self._vpn_type.startswith('l2tpipsec-psk'):
84            return vpn_server.L2TPIPSecVPNServer('psk',
85                                                 self.SERVER_INTERFACE_NAME,
86                                                 self.SERVER_ADDRESS,
87                                                 self.NETWORK_PREFIX,
88                                                 'xauth' in self._vpn_type)
89        elif self._vpn_type.startswith('l2tpipsec-cert'):
90            return vpn_server.L2TPIPSecVPNServer('cert',
91                                                 self.SERVER_INTERFACE_NAME,
92                                                 self.SERVER_ADDRESS,
93                                                 self.NETWORK_PREFIX)
94        elif self._vpn_type.startswith('openvpn'):
95            return vpn_server.OpenVPNServer(self.SERVER_INTERFACE_NAME,
96                                            self.SERVER_ADDRESS,
97                                            self.NETWORK_PREFIX,
98                                            'user_pass' in self._vpn_type)
99        else:
100            raise error.TestFail('Unknown vpn server type %s' % self._vpn_type)
101
102
103    def get_vpn_client_properties(self, tpm):
104        """Returns VPN configuration properties.
105
106        @param tpm object TPM store instance to add credentials if necessary.
107
108        """
109        if self._vpn_type.startswith('l2tpipsec-psk'):
110            params = {
111                'L2TPIPsec.Password': vpn_server.L2TPIPSecVPNServer.CHAP_SECRET,
112                'L2TPIPsec.PSK':
113                        vpn_server.L2TPIPSecVPNServer.IPSEC_PRESHARED_KEY,
114                'L2TPIPsec.User':vpn_server.L2TPIPSecVPNServer.CHAP_USER,
115                'Name': 'test-vpn-l2tp-psk',
116                'Provider.Host': self.SERVER_ADDRESS,
117                'Provider.Type': 'l2tpipsec',
118                'Type': 'vpn',
119                'VPN.Domain': 'test-vpn-psk-domain'
120            }
121            if 'xauth' in self._vpn_type:
122                if 'incorrect_user' in self._vpn_type:
123                    params['L2TPIPsec.XauthUser'] = 'wrong_user'
124                    params['L2TPIPsec.XauthPassword'] = 'wrong_password'
125                elif 'incorrect_missing_user' not in self._vpn_type:
126                    params['L2TPIPsec.XauthUser'] = (
127                            vpn_server.L2TPIPSecVPNServer.XAUTH_USER)
128                    params['L2TPIPsec.XauthPassword'] = (
129                            vpn_server.L2TPIPSecVPNServer.XAUTH_PASSWORD)
130            return params
131        elif self._vpn_type == 'l2tpipsec-cert':
132            tpm.install_certificate(site_eap_certs.client_cert_1,
133                                    site_eap_certs.cert_1_tpm_key_id)
134            tpm.install_private_key(site_eap_certs.client_private_key_1,
135                                    site_eap_certs.cert_1_tpm_key_id)
136            return {
137                'L2TPIPsec.CACertPEM': [ site_eap_certs.ca_cert_1 ],
138                'L2TPIPsec.ClientCertID': site_eap_certs.cert_1_tpm_key_id,
139                'L2TPIPsec.ClientCertSlot': tpm.SLOT_ID,
140                'L2TPIPsec.User':vpn_server.L2TPIPSecVPNServer.CHAP_USER,
141                'L2TPIPsec.Password': vpn_server.L2TPIPSecVPNServer.CHAP_SECRET,
142                'L2TPIPsec.PIN': tpm.PIN,
143                'Name': 'test-vpn-l2tp-cert',
144                'Provider.Host': self.SERVER_ADDRESS,
145                'Provider.Type': 'l2tpipsec',
146                'Type': 'vpn',
147                'VPN.Domain': 'test-vpn-psk-domain'
148            }
149        elif self._vpn_type.startswith('openvpn'):
150            tpm.install_certificate(site_eap_certs.client_cert_1,
151                                    site_eap_certs.cert_1_tpm_key_id)
152            tpm.install_private_key(site_eap_certs.client_private_key_1,
153                                    site_eap_certs.cert_1_tpm_key_id)
154            params = {
155                'Name': 'test-vpn-openvpn',
156                'Provider.Host': self.SERVER_ADDRESS,
157                'Provider.Type': 'openvpn',
158                'Type': 'vpn',
159                'VPN.Domain': 'test-openvpn-domain',
160                'OpenVPN.CACertPEM': [ site_eap_certs.ca_cert_1 ],
161                'OpenVPN.Pkcs11.ID': site_eap_certs.cert_1_tpm_key_id,
162                'OpenVPN.Pkcs11.PIN': tpm.PIN,
163                'OpenVPN.RemoteCertEKU': 'TLS Web Server Authentication',
164                'OpenVPN.Verb': '5'
165            }
166            if 'user_pass' in self._vpn_type:
167                params['OpenVPN.User'] = vpn_server.OpenVPNServer.USERNAME
168                params['OpenVPN.Password'] = vpn_server.OpenVPNServer.PASSWORD
169            if 'cert_verify' in self._vpn_type:
170                ca = certificate_util.PEMCertificate(site_eap_certs.ca_cert_1)
171                if 'incorrect_hash' in self._vpn_type:
172                    bogus_hash = ':'.join(['00'] * 20)
173                    params['OpenVPN.VerifyHash'] = bogus_hash
174                else:
175                    params['OpenVPN.VerifyHash'] = ca.fingerprint
176                server = certificate_util.PEMCertificate(
177                        site_eap_certs.server_cert_1)
178                if 'incorrect_subject' in self._vpn_type:
179                    params['OpenVPN.VerifyX509Name'] = 'bogus subject name'
180                elif 'incorrect_cn' in self._vpn_type:
181                    params['OpenVPN.VerifyX509Name'] = 'bogus cn'
182                    params['OpenVPN.VerifyX509Type'] = 'name'
183                elif 'cn_only' in self._vpn_type:
184                    params['OpenVPN.VerifyX509Name'] = server.subject_dict['CN']
185                    params['OpenVPN.VerifyX509Type'] = 'name'
186                else:
187                    # This is the form OpenVPN expects.
188                    params['OpenVPN.VerifyX509Name'] = ', '.join(server.subject)
189            return params
190        else:
191            raise error.TestFail('Unknown vpn client type %s' % self._vpn_type)
192
193
194    def connect_vpn(self):
195        """Connects the client to the VPN server."""
196        proxy = self._shill_proxy
197        with tpm_store.TPMStore() as tpm:
198            service = proxy.get_service(self.get_vpn_client_properties(tpm))
199            service.Connect()
200            result = proxy.wait_for_property_in(service,
201                                                proxy.SERVICE_PROPERTY_STATE,
202                                                ('ready', 'online'),
203                                                self.CONNECT_TIMEOUT_SECONDS)
204        (successful, _, _) = result
205        if not successful and self._expect_success:
206            raise error.TestFail('VPN connection failed')
207        if successful and not self._expect_success:
208            raise error.TestFail('VPN connection suceeded '
209                                 'when it should have failed')
210
211
212    def run_once(self, vpn_types=[]):
213        """Test main loop."""
214        self._shill_proxy = shill_proxy.ShillProxy()
215        for vpn_type in vpn_types:
216            self.run_vpn_test(vpn_type)
217
218
219    def run_vpn_test(self, vpn_type):
220        """Run a vpn test of |vpn_type|.
221
222        @param vpn_type string type of VPN test to run.
223
224        """
225        manager = self._shill_proxy.manager
226        server_address_and_prefix = '%s/%d' % (self.SERVER_ADDRESS,
227                                               self.NETWORK_PREFIX)
228        client_address_and_prefix = '%s/%d' % (self.CLIENT_ADDRESS,
229                                               self.NETWORK_PREFIX)
230        self._vpn_type = vpn_type
231        self._expect_success = 'incorrect' not in vpn_type
232
233        with shill_temporary_profile.ShillTemporaryProfile(
234                manager, profile_name=self.TEST_PROFILE_NAME):
235            with virtual_ethernet_pair.VirtualEthernetPair(
236                    interface_name=self.SERVER_INTERFACE_NAME,
237                    peer_interface_name=self.CLIENT_INTERFACE_NAME,
238                    peer_interface_ip=client_address_and_prefix,
239                    interface_ip=server_address_and_prefix,
240                    ignore_shutdown_errors=True) as ethernet_pair:
241                if not ethernet_pair.is_healthy:
242                    raise error.TestFail('Virtual ethernet pair failed.')
243
244                # When shill finds this ethernet interface, it will reset
245                # its IP address and start a DHCP client.  We must configure
246                # the static IP address through shill.
247                self.configure_static_ip(self.CLIENT_INTERFACE_NAME,
248                                         self.CLIENT_ADDRESS,
249                                         self.NETWORK_PREFIX)
250
251                with self.get_vpn_server() as server:
252                    self.connect_vpn()
253                    utils.ping(server.SERVER_IP_ADDRESS, tries=3)
254