• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import collections
6import logging
7import re
8import time
9
10from autotest_lib.client.common_lib import error
11from autotest_lib.client.common_lib import utils
12from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes
13
14
15# Used to represent stations we parse out of scan results.
16Station = collections.namedtuple('Station',
17                                 ['bssid', 'frequency', 'signal', 'ssid'])
18
19class WpaCliProxy(object):
20    """Interacts with a DUT through wpa_cli rather than shill."""
21
22    SCANNING_INTERVAL_SECONDS = 5
23    POLLING_INTERVAL_SECONDS = 0.5
24    # From wpa_supplicant.c:wpa_supplicant_state_txt()
25    WPA_SUPPLICANT_ASSOCIATING_STATES = (
26            'AUTHENTICATING',
27            'ASSOCIATING',
28            'ASSOCIATED',
29            '4WAY_HANDSHAKE',
30            'GROUP_HANDSHAKE')
31    WPA_SUPPLICANT_ASSOCIATED_STATES = (
32            'COMPLETED',)
33    ANDROID_CMD_FORMAT = '/system/bin/wpa_cli IFNAME={0[ifname]} {0[cmd]}'
34    BRILLO_CMD_FORMAT = 'su system /system/bin/wpa_cli -i{0[ifname]} -p/data/misc/wifi/sockets {0[cmd]}'
35    CROS_CMD_FORMAT = ('su wpa -s /bin/bash '
36                       '-c "/usr/bin/wpa_cli -i {0[ifname]} {0[cmd]}"')
37    CAST_CMD_FORMAT = '/system/bin/wpa_cli -i {0[ifname]} {0[cmd]}'
38
39
40    def __init__(self, host, wifi_if):
41        self._host = host
42        self._wifi_if = wifi_if
43        self._created_networks = {}
44        # TODO(wiley) Hardcoding this IFNAME prefix makes some big assumptions.
45        #             we'll need to discover this parameter as it becomes more
46        #             generally useful.
47        if host.get_os_type() == 'android':
48            self._wpa_cli_cmd_format = self.ANDROID_CMD_FORMAT
49        elif host.get_os_type() == 'brillo':
50            self._wpa_cli_cmd_format = self.BRILLO_CMD_FORMAT
51        elif host.get_os_type() == 'cros':
52            self._wpa_cli_cmd_format = self.CROS_CMD_FORMAT
53        elif host.get_os_type() == 'cast_os':
54            self._wpa_cli_cmd_format = self.CAST_CMD_FORMAT
55
56
57    def _add_network(self, ssid):
58        """
59        Add a wpa_supplicant network for ssid.
60
61        @param ssid string: name of network to add.
62        @return int network id of added network.
63
64        """
65        add_result = self.run_wpa_cli_cmd('add_network', check_result=False)
66        network_id = int(add_result.stdout.splitlines()[-1])
67        self.run_wpa_cli_cmd('set_network %d ssid \\"%s\\"' %
68                             (network_id, ssid))
69        self._created_networks[ssid] = network_id
70        logging.debug('Added network %s=%d', ssid, network_id)
71        return network_id
72
73
74    def run_wpa_cli_cmd(self, command, check_result=True):
75        """
76        Run a wpa_cli command and optionally check the result.
77
78        Note: if you're using this function to do things like initiating scans,
79        consider initating those through Shill instead, to avoid collisions.
80
81        @param command string: suffix of a command to be prefixed with
82                an appropriate wpa_cli for this host.
83        @param check_result bool: True iff we want to check that the
84                command comes back with an 'OK' response.
85        @return result object returned by host.run.
86
87        """
88        cmd = self._wpa_cli_cmd_format.format(
89                {'ifname' : self._wifi_if, 'cmd' : command})
90        result = self._host.run(cmd)
91        if check_result and not result.stdout.strip().endswith('OK'):
92            raise error.TestFail('wpa_cli command failed: %s' % command)
93
94        return result
95
96
97    def _get_status_dict(self):
98        """
99        Gets the status output for a WiFi interface.
100
101        Get the output of wpa_cli status.  This summarizes what wpa_supplicant
102        is doing with respect to the WiFi interface.
103
104        Example output:
105
106            Using interface 'wlan0'
107            wpa_state=INACTIVE
108            p2p_device_address=32:76:6f:f2:a6:c4
109            address=30:76:6f:f2:a6:c4
110
111        @return dict of key/value pairs parsed from output using = as divider.
112
113        """
114        status_result = self.run_wpa_cli_cmd('status', check_result=False)
115        return dict([line.strip().split('=', 1)
116                     for line in status_result.stdout.splitlines()
117                     if line.find('=') > 0])
118
119
120    def _is_associating_or_associated(self):
121        """@return True if the DUT is assocating or associated with a BSS."""
122        state = self._get_status_dict().get('wpa_state', None)
123        return state in (self.WPA_SUPPLICANT_ASSOCIATING_STATES +
124                         self.WPA_SUPPLICANT_ASSOCIATED_STATES)
125
126
127    def _is_associated(self, ssid):
128        """
129        Check if the DUT is associated to a given SSID.
130
131        @param ssid string: SSID of the network we're concerned about.
132        @return True if we're associated with the specified SSID.
133
134        """
135        status_dict = self._get_status_dict()
136        return (status_dict.get('ssid', None) == ssid and
137                status_dict.get('wpa_state', None) in
138                        self.WPA_SUPPLICANT_ASSOCIATED_STATES)
139
140
141    def _is_connected(self, ssid):
142        """
143        Check that we're connected to |ssid| and have an IP address.
144
145        @param ssid string: SSID of the network we're concerned about.
146        @return True if we have an IP and we're associated with |ssid|.
147
148        """
149        status_dict = self._get_status_dict()
150        return (status_dict.get('ssid', None) == ssid and
151                status_dict.get('ip_address', None))
152
153
154    def clean_profiles(self):
155        """Remove state associated with past networks we've connected to."""
156        # list_networks output looks like:
157        # Using interface 'wlan0'^M
158        # network id / ssid / bssid / flags^M
159        # 0    SimpleConnect_jstja_ch1 any     [DISABLED]^M
160        # 1    SimpleConnect_gjji2_ch6 any     [DISABLED]^M
161        # 2    SimpleConnect_xe9d1_ch11        any     [DISABLED]^M
162        list_networks_result = self.run_wpa_cli_cmd(
163                'list_networks', check_result=False)
164        start_parsing = False
165        for line in list_networks_result.stdout.splitlines():
166            if not start_parsing:
167                if line.startswith('network id'):
168                    start_parsing = True
169                continue
170
171            network_id = int(line.split()[0])
172            self.run_wpa_cli_cmd('remove_network %d' % network_id)
173        self._created_networks = {}
174
175
176    def create_profile(self, _):
177        """
178        This is a no op, since we don't have profiles.
179
180        @param _ ignored.
181
182        """
183        logging.info('Skipping create_profile on %s', self.__class__.__name__)
184
185
186    def pop_profile(self, _):
187        """
188        This is a no op, since we don't have profiles.
189
190        @param _ ignored.
191
192        """
193        logging.info('Skipping pop_profile on %s', self.__class__.__name__)
194
195
196    def push_profile(self, _):
197        """
198        This is a no op, since we don't have profiles.
199
200        @param _ ignored.
201
202        """
203        logging.info('Skipping push_profile on %s', self.__class__.__name__)
204
205
206    def remove_profile(self, _):
207        """
208        This is a no op, since we don't have profiles.
209
210        @param _ ignored.
211
212        """
213        logging.info('Skipping remove_profile on %s', self.__class__.__name__)
214
215
216    def init_test_network_state(self):
217        """Create a clean slate for tests with respect to remembered networks.
218
219        For wpa_cli hosts, this means removing all remembered networks.
220
221        @return True iff operation succeeded, False otherwise.
222
223        """
224        self.clean_profiles()
225        return True
226
227
228    def connect_wifi(self, assoc_params):
229        """
230        Connect to the WiFi network described by AssociationParameters.
231
232        @param assoc_params AssociationParameters object.
233        @return serialized AssociationResult object.
234
235        """
236        logging.debug('connect_wifi()')
237        # Ouptut should look like:
238        #   Using interface 'wlan0'
239        #   0
240        assoc_result = xmlrpc_datatypes.AssociationResult()
241        network_id = self._add_network(assoc_params.ssid)
242        if assoc_params.is_hidden:
243            self.run_wpa_cli_cmd('set_network %d %s %s' %
244                                 (network_id, 'scan_ssid', '1'))
245
246        sec_config = assoc_params.security_config
247        for field, value in sec_config.get_wpa_cli_properties().iteritems():
248            self.run_wpa_cli_cmd('set_network %d %s %s' %
249                                 (network_id, field, value))
250        self.run_wpa_cli_cmd('select_network %d' % network_id)
251
252        # Wait for an appropriate BSS to appear in scan results.
253        scan_results_pattern = '\t'.join(['([0-9a-f:]{17})', # BSSID
254                                          '([0-9]+)',  # Frequency
255                                          '(-[0-9]+)',  # Signal level
256                                          '(.*)',  # Encryption types
257                                          '(.*)'])  # SSID
258        last_scan_time = -1.0
259        start_time = time.time()
260        while time.time() - start_time < assoc_params.discovery_timeout:
261            assoc_result.discovery_time = time.time() - start_time
262            if self._is_associating_or_associated():
263                # Internally, wpa_supplicant writes its scan_results response
264                # to a 4kb buffer.  When there are many BSS's, the buffer fills
265                # up, and we'll never see the BSS we care about in some cases.
266                break
267
268            scan_result = self.run_wpa_cli_cmd('scan_results',
269                                               check_result=False)
270            found_stations = []
271            for line in scan_result.stdout.strip().splitlines():
272                match = re.match(scan_results_pattern, line)
273                if match is None:
274                    continue
275                found_stations.append(
276                        Station(bssid=match.group(1), frequency=match.group(2),
277                                signal=match.group(3), ssid=match.group(5)))
278            logging.debug('Found stations: %r',
279                          [station.ssid for station in found_stations])
280            if [station for station in found_stations
281                    if station.ssid == assoc_params.ssid]:
282                break
283
284            if time.time() - last_scan_time > self.SCANNING_INTERVAL_SECONDS:
285                # Sometimes this might fail with a FAIL-BUSY if the previous
286                # scan hasn't finished.
287                scan_result = self.run_wpa_cli_cmd('scan', check_result=False)
288                if scan_result.stdout.strip().endswith('OK'):
289                    last_scan_time = time.time()
290            time.sleep(self.POLLING_INTERVAL_SECONDS)
291        else:
292            assoc_result.failure_reason = 'Discovery timed out'
293            return assoc_result.serialize()
294
295        # Wait on association to finish.
296        start_time = time.time()
297        success = utils.poll_for_condition(
298                condition=lambda: self._is_associated(assoc_params.ssid),
299                timeout=assoc_params.association_timeout,
300                sleep_interval=self.POLLING_INTERVAL_SECONDS,
301                desc='Wait on association to finish')
302        assoc_result.association_time = time.time() - start_time
303        if not success:
304            assoc_result.failure_reason = 'Association timed out'
305            return assoc_result.serialize()
306
307        # Then wait for ip configuration to finish.
308        start_time = time.time()
309        success = utils.poll_for_condition(
310                condition=lambda: self._is_connected(assoc_params.ssid),
311                timeout=assoc_params.configuration_timeout,
312                sleep_interval=self.POLLING_INTERVAL_SECONDS,
313                desc='Wait for ip configuration to finish')
314        assoc_result.configuration_time = time.time() - start_time
315        if not success:
316            assoc_result.failure_reason = 'DHCP negotiation timed out'
317            return assoc_result.serialize()
318
319        assoc_result.success = True
320        logging.info('Connected to %s', assoc_params.ssid)
321        return assoc_result.serialize()
322
323
324    def disconnect(self, ssid):
325        """
326        Disconnect from a WiFi network named |ssid|.
327
328        @param ssid string: name of network to disable in wpa_supplicant.
329
330        """
331        logging.debug('disconnect()')
332        if ssid not in self._created_networks:
333            return False
334        self.run_wpa_cli_cmd('disable_network %d' %
335                             self._created_networks[ssid])
336        return True
337
338
339    def delete_entries_for_ssid(self, ssid):
340        """Delete a profile entry.
341
342        @param ssid string of WiFi service for which to delete entries.
343        @return True on success, False otherwise.
344        """
345        return self.disconnect(ssid)
346
347
348    def set_device_enabled(self, wifi_interface, enabled):
349        """Enable or disable the WiFi device.
350
351        @param wifi_interface: string name of interface being modified.
352        @param enabled: boolean; true if this device should be enabled,
353                false if this device should be disabled.
354        @return True if it worked; false, otherwise
355
356        """
357        return False
358
359
360    def sync_time_to(self, epoch_seconds):
361        """
362        Sync time on the DUT to |epoch_seconds| from the epoch.
363
364        @param epoch_seconds float: number of seconds since the epoch.
365
366        """
367        # This will claim to fail, but will work anyway.
368        self._host.run('date -u %f' % epoch_seconds, ignore_status=True)
369