• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import collections
6import logging
7import re
8import time
9
10from autotest_lib.client.common_lib import error
11from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes
12
13
14# Used to represent stations we parse out of scan results.
15Station = collections.namedtuple('Station',
16                                 ['bssid', 'frequency', 'signal', 'ssid'])
17
18class WpaCliProxy(object):
19    """Interacts with a DUT through wpa_cli rather than shill."""
20
21    SCANNING_INTERVAL_SECONDS = 5
22    POLLING_INTERVAL_SECONDS = 0.5
23    # From wpa_supplicant.c:wpa_supplicant_state_txt()
24    WPA_SUPPLICANT_ASSOCIATING_STATES = (
25            'AUTHENTICATING',
26            'ASSOCIATING',
27            'ASSOCIATED',
28            '4WAY_HANDSHAKE',
29            'GROUP_HANDSHAKE')
30    WPA_SUPPLICANT_ASSOCIATED_STATES = (
31            'COMPLETED',)
32    ANDROID_CMD_FORMAT = '/system/bin/wpa_cli IFNAME={0[ifname]} {0[cmd]}'
33    BRILLO_CMD_FORMAT = 'su system /system/bin/wpa_cli -i{0[ifname]} -p/data/misc/wifi/sockets {0[cmd]}'
34    CROS_CMD_FORMAT = 'su wpa -s /bin/bash -c "/usr/bin/wpa_cli {0[cmd]}"'
35    CAST_CMD_FORMAT = '/system/bin/wpa_cli {0[cmd]}'
36
37
38    def __init__(self, host, wifi_if):
39        self._host = host
40        self._wifi_if = wifi_if
41        self._created_networks = {}
42        # TODO(wiley) Hardcoding this IFNAME prefix makes some big assumptions.
43        #             we'll need to discover this parameter as it becomes more
44        #             generally useful.
45        if host.get_os_type() == 'android':
46            self._wpa_cli_cmd_format = self.ANDROID_CMD_FORMAT
47        elif host.get_os_type() == 'brillo':
48            self._wpa_cli_cmd_format = self.BRILLO_CMD_FORMAT
49        elif host.get_os_type() == 'cros':
50            self._wpa_cli_cmd_format = self.CROS_CMD_FORMAT
51        elif host.get_os_type() == 'cast_os':
52            self._wpa_cli_cmd_format = self.CAST_CMD_FORMAT
53
54
55    def _add_network(self, ssid):
56        """
57        Add a wpa_supplicant network for ssid.
58
59        @param ssid string: name of network to add.
60        @return int network id of added network.
61
62        """
63        add_result = self.run_wpa_cli_cmd('add_network', check_result=False)
64        network_id = int(add_result.stdout.splitlines()[-1])
65        self.run_wpa_cli_cmd('set_network %d ssid \\"%s\\"' %
66                             (network_id, ssid))
67        self._created_networks[ssid] = network_id
68        logging.debug('Added network %s=%d', ssid, network_id)
69        return network_id
70
71
72    def run_wpa_cli_cmd(self, command, check_result=True):
73        """
74        Run a wpa_cli command and optionally check the result.
75
76        @param command string: suffix of a command to be prefixed with
77                an appropriate wpa_cli for this host.
78        @param check_result bool: True iff we want to check that the
79                command comes back with an 'OK' response.
80        @return result object returned by host.run.
81
82        """
83        cmd = self._wpa_cli_cmd_format.format(
84                {'ifname' : self._wifi_if, 'cmd' : command})
85        result = self._host.run(cmd)
86        if check_result and not result.stdout.strip().endswith('OK'):
87            raise error.TestFail('wpa_cli command failed: %s' % command)
88
89        return result
90
91
92    def _get_status_dict(self):
93        """
94        Gets the status output for a WiFi interface.
95
96        Get the output of wpa_cli status.  This summarizes what wpa_supplicant
97        is doing with respect to the WiFi interface.
98
99        Example output:
100
101            Using interface 'wlan0'
102            wpa_state=INACTIVE
103            p2p_device_address=32:76:6f:f2:a6:c4
104            address=30:76:6f:f2:a6:c4
105
106        @return dict of key/value pairs parsed from output using = as divider.
107
108        """
109        status_result = self.run_wpa_cli_cmd('status', check_result=False)
110        return dict([line.strip().split('=', 1)
111                     for line in status_result.stdout.splitlines()
112                     if line.find('=') > 0])
113
114
115    def _is_associating_or_associated(self):
116        """@return True if the DUT is assocating or associated with a BSS."""
117        state = self._get_status_dict().get('wpa_state', None)
118        return state in (self.WPA_SUPPLICANT_ASSOCIATING_STATES +
119                         self.WPA_SUPPLICANT_ASSOCIATED_STATES)
120
121
122    def _is_associated(self, ssid):
123        """
124        Check if the DUT is associated to a given SSID.
125
126        @param ssid string: SSID of the network we're concerned about.
127        @return True if we're associated with the specified SSID.
128
129        """
130        status_dict = self._get_status_dict()
131        return (status_dict.get('ssid', None) == ssid and
132                status_dict.get('wpa_state', None) in
133                        self.WPA_SUPPLICANT_ASSOCIATED_STATES)
134
135
136    def _is_connected(self, ssid):
137        """
138        Check that we're connected to |ssid| and have an IP address.
139
140        @param ssid string: SSID of the network we're concerned about.
141        @return True if we have an IP and we're associated with |ssid|.
142
143        """
144        status_dict = self._get_status_dict()
145        return (status_dict.get('ssid', None) == ssid and
146                status_dict.get('ip_address', None))
147
148
149    def _wait_until(self, value_check, timeout_seconds):
150        """
151        Call a function repeatedly until we time out.
152
153        Call value_check() every POLLING_INTERVAL_SECONDS seconds
154        until |timeout_seconds| have passed.  Return whether
155        value_check() returned a True value and the time we spent in this
156        function.
157
158        @param timeout_seconds numeric: number of seconds to wait.
159        @return a tuple (success, duration_seconds) where success is a boolean
160                and duration is a float.
161
162        """
163        start_time = time.time()
164        while time.time() - start_time < timeout_seconds:
165            duration = time.time() - start_time
166            if value_check():
167                return (True, duration)
168
169            time.sleep(self.POLLING_INTERVAL_SECONDS)
170        duration = time.time() - start_time
171        return (False, duration)
172
173
174    def clean_profiles(self):
175        """Remove state associated with past networks we've connected to."""
176        # list_networks output looks like:
177        # Using interface 'wlan0'^M
178        # network id / ssid / bssid / flags^M
179        # 0    SimpleConnect_jstja_ch1 any     [DISABLED]^M
180        # 1    SimpleConnect_gjji2_ch6 any     [DISABLED]^M
181        # 2    SimpleConnect_xe9d1_ch11        any     [DISABLED]^M
182        list_networks_result = self.run_wpa_cli_cmd(
183                'list_networks', check_result=False)
184        start_parsing = False
185        for line in list_networks_result.stdout.splitlines():
186            if not start_parsing:
187                if line.startswith('network id'):
188                    start_parsing = True
189                continue
190
191            network_id = int(line.split()[0])
192            self.run_wpa_cli_cmd('remove_network %d' % network_id)
193        self._created_networks = {}
194
195
196    def create_profile(self, _):
197        """
198        This is a no op, since we don't have profiles.
199
200        @param _ ignored.
201
202        """
203        logging.info('Skipping create_profile on %s', self.__class__.__name__)
204
205
206    def pop_profile(self, _):
207        """
208        This is a no op, since we don't have profiles.
209
210        @param _ ignored.
211
212        """
213        logging.info('Skipping pop_profile on %s', self.__class__.__name__)
214
215
216    def push_profile(self, _):
217        """
218        This is a no op, since we don't have profiles.
219
220        @param _ ignored.
221
222        """
223        logging.info('Skipping push_profile on %s', self.__class__.__name__)
224
225
226    def remove_profile(self, _):
227        """
228        This is a no op, since we don't have profiles.
229
230        @param _ ignored.
231
232        """
233        logging.info('Skipping remove_profile on %s', self.__class__.__name__)
234
235
236    def init_test_network_state(self):
237        """Create a clean slate for tests with respect to remembered networks.
238
239        For wpa_cli hosts, this means removing all remembered networks.
240
241        @return True iff operation succeeded, False otherwise.
242
243        """
244        self.clean_profiles()
245        return True
246
247
248    def connect_wifi(self, assoc_params):
249        """
250        Connect to the WiFi network described by AssociationParameters.
251
252        @param assoc_params AssociationParameters object.
253        @return serialized AssociationResult object.
254
255        """
256        logging.debug('connect_wifi()')
257        # Ouptut should look like:
258        #   Using interface 'wlan0'
259        #   0
260        assoc_result = xmlrpc_datatypes.AssociationResult()
261        network_id = self._add_network(assoc_params.ssid)
262        if assoc_params.is_hidden:
263            self.run_wpa_cli_cmd('set_network %d %s %s' %
264                                 (network_id, 'scan_ssid', '1'))
265
266        sec_config = assoc_params.security_config
267        for field, value in sec_config.get_wpa_cli_properties().iteritems():
268            self.run_wpa_cli_cmd('set_network %d %s %s' %
269                                 (network_id, field, value))
270        self.run_wpa_cli_cmd('select_network %d' % network_id)
271
272        # Wait for an appropriate BSS to appear in scan results.
273        scan_results_pattern = '\t'.join(['([0-9a-f:]{17})', # BSSID
274                                          '([0-9]+)',  # Frequency
275                                          '(-[0-9]+)',  # Signal level
276                                          '(.*)',  # Encryption types
277                                          '(.*)'])  # SSID
278        last_scan_time = -1.0
279        start_time = time.time()
280        while time.time() - start_time < assoc_params.discovery_timeout:
281            assoc_result.discovery_time = time.time() - start_time
282            if self._is_associating_or_associated():
283                # Internally, wpa_supplicant writes its scan_results response
284                # to a 4kb buffer.  When there are many BSS's, the buffer fills
285                # up, and we'll never see the BSS we care about in some cases.
286                break
287
288            scan_result = self.run_wpa_cli_cmd('scan_results',
289                                               check_result=False)
290            found_stations = []
291            for line in scan_result.stdout.strip().splitlines():
292                match = re.match(scan_results_pattern, line)
293                if match is None:
294                    continue
295                found_stations.append(
296                        Station(bssid=match.group(1), frequency=match.group(2),
297                                signal=match.group(3), ssid=match.group(5)))
298            logging.debug('Found stations: %r',
299                          [station.ssid for station in found_stations])
300            if [station for station in found_stations
301                    if station.ssid == assoc_params.ssid]:
302                break
303
304            if time.time() - last_scan_time > self.SCANNING_INTERVAL_SECONDS:
305                # Sometimes this might fail with a FAIL-BUSY if the previous
306                # scan hasn't finished.
307                scan_result = self.run_wpa_cli_cmd('scan', check_result=False)
308                if scan_result.stdout.strip().endswith('OK'):
309                    last_scan_time = time.time()
310            time.sleep(self.POLLING_INTERVAL_SECONDS)
311        else:
312            assoc_result.failure_reason = 'Discovery timed out'
313            return assoc_result.serialize()
314
315        # Wait on association to finish.
316        success, assoc_result.association_time = self._wait_until(
317                lambda: self._is_associated(assoc_params.ssid),
318                assoc_params.association_timeout)
319        if not success:
320            assoc_result.failure_reason = 'Association timed out'
321            return assoc_result.serialize()
322
323        # Then wait for ip configuration to finish.
324        success, assoc_result.configuration_time = self._wait_until(
325                lambda: self._is_connected(assoc_params.ssid),
326                assoc_params.configuration_timeout)
327        if not success:
328            assoc_result.failure_reason = 'DHCP negotiation timed out'
329            return assoc_result.serialize()
330
331        assoc_result.success = True
332        logging.info('Connected to %s', assoc_params.ssid)
333        return assoc_result.serialize()
334
335
336    def disconnect(self, ssid):
337        """
338        Disconnect from a WiFi network named |ssid|.
339
340        @param ssid string: name of network to disable in wpa_supplicant.
341
342        """
343        logging.debug('disconnect()')
344        if ssid not in self._created_networks:
345            return False
346        self.run_wpa_cli_cmd('disable_network %d' %
347                             self._created_networks[ssid])
348        return True
349
350
351    def delete_entries_for_ssid(self, ssid):
352        """Delete a profile entry.
353
354        @param ssid string of WiFi service for which to delete entries.
355        @return True on success, False otherwise.
356        """
357        return self.disconnect(ssid)
358
359
360    def set_device_enabled(self, wifi_interface, enabled):
361        """Enable or disable the WiFi device.
362
363        @param wifi_interface: string name of interface being modified.
364        @param enabled: boolean; true if this device should be enabled,
365                false if this device should be disabled.
366        @return True if it worked; false, otherwise
367
368        """
369        return False
370
371
372    def sync_time_to(self, epoch_seconds):
373        """
374        Sync time on the DUT to |epoch_seconds| from the epoch.
375
376        @param epoch_seconds float: number of seconds since the epoch.
377
378        """
379        # This will claim to fail, but will work anyway.
380        self._host.run('date -u %f' % epoch_seconds, ignore_status=True)
381