• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import time
18
19from acts import logger
20from acts import signals
21from acts import utils
22
23TIME_TO_SLEEP_BETWEEN_RETRIES = 1
24TIME_TO_WAIT_FOR_COUNTRY_CODE = 10
25
26
27class WlanControllerError(signals.ControllerError):
28    pass
29
30
31class WlanController:
32    """Contains methods related to wlan core, to be used in FuchsiaDevice object"""
33
34    def __init__(self, fuchsia_device):
35        self.device = fuchsia_device
36        self.log = logger.create_tagged_trace_logger(
37            'WlanController for FuchsiaDevice | %s' % self.device.ip)
38
39    # TODO(70501): Wrap wlan_lib functions and setup from FuchsiaDevice here
40    # (similar to how WlanPolicyController does it) to prevent FuchsiaDevice
41    # from growing too large.
42    def _configure_wlan(self):
43        pass
44
45    def _deconfigure_wlan(self):
46        pass
47
48    def update_wlan_interfaces(self):
49        """ Retrieves WLAN interfaces from device and sets the FuchsiaDevice
50        attributes.
51        """
52        wlan_interfaces = self.get_interfaces_by_role()
53        self.device.wlan_client_interfaces = wlan_interfaces['client']
54        self.device.wlan_ap_interfaces = wlan_interfaces['ap']
55
56        # Set test interfaces to value from config, else the first found
57        # interface, else None
58        self.device.wlan_client_test_interface_name = self.device.conf_data.get(
59            'wlan_client_test_interface',
60            next(iter(self.device.wlan_client_interfaces), None))
61
62        self.device.wlan_ap_test_interface_name = self.device.conf_data.get(
63            'wlan_ap_test_interface',
64            next(iter(self.device.wlan_ap_interfaces), None))
65
66    def get_interfaces_by_role(self):
67        """ Retrieves WLAN interface information, supplimented by netstack info.
68
69        Returns:
70            Dict with keys 'client' and 'ap', each of which contain WLAN
71            interfaces.
72        """
73
74        # Retrieve WLAN interface IDs
75        response = self.device.wlan_lib.wlanGetIfaceIdList()
76        if response.get('error'):
77            raise WlanControllerError('Failed to get WLAN iface ids: %s' %
78                                      response['error'])
79
80        wlan_iface_ids = response.get('result', [])
81        if len(wlan_iface_ids) < 1:
82            return {'client': {}, 'ap': {}}
83
84        # Use IDs to get WLAN interface info and mac addresses
85        wlan_ifaces_by_mac = {}
86        for id in wlan_iface_ids:
87            response = self.device.wlan_lib.wlanQueryInterface(id)
88            if response.get('error'):
89                raise WlanControllerError(
90                    'Failed to query wlan iface id %s: %s' %
91                    (id, response['error']))
92
93            mac = response['result'].get('sta_addr', None)
94            if mac is None:
95                # Fallback to older field name to maintain backwards
96                # compatibility with older versions of SL4F's
97                # QueryIfaceResponse. See https://fxrev.dev/562146.
98                mac = response['result'].get('mac_addr')
99
100            wlan_ifaces_by_mac[utils.mac_address_list_to_str(
101                mac)] = response['result']
102
103        # Use mac addresses to query the interfaces from the netstack view,
104        # which allows us to supplement the interface information with the name,
105        # netstack_id, etc.
106
107        # TODO(fxb/75909): This tedium is necessary to get the interface name
108        # because only netstack has that information. The bug linked here is
109        # to reconcile some of the information between the two perspectives, at
110        # which point we can eliminate step.
111        net_ifaces = self.device.netstack_controller.list_interfaces()
112        wlan_ifaces_by_role = {'client': {}, 'ap': {}}
113        for iface in net_ifaces:
114            try:
115                # Some interfaces might not have a MAC
116                iface_mac = utils.mac_address_list_to_str(iface['mac'])
117            except Exception as e:
118                self.log.debug(f'Error {e} getting MAC for iface {iface}')
119                continue
120            if iface_mac in wlan_ifaces_by_mac:
121                wlan_ifaces_by_mac[iface_mac]['netstack_id'] = iface['id']
122
123                # Add to return dict, mapped by role then name.
124                wlan_ifaces_by_role[
125                    wlan_ifaces_by_mac[iface_mac]['role'].lower()][
126                        iface['name']] = wlan_ifaces_by_mac[iface_mac]
127
128        return wlan_ifaces_by_role
129
130    def set_country_code(self, country_code):
131        """Sets country code through the regulatory region service and waits
132        for the code to be applied to WLAN PHY.
133
134        Args:
135            country_code: string, the 2 character country code to set
136
137        Raises:
138            EnvironmentError - failure to get/set regulatory region
139            ConnectionError - failure to query PHYs
140        """
141        self.log.info('Setting DUT country code to %s' % country_code)
142        country_code_response = self.device.regulatory_region_lib.setRegion(
143            country_code)
144        if country_code_response.get('error'):
145            raise EnvironmentError(
146                'Failed to set country code (%s) on DUT. Error: %s' %
147                (country_code, country_code_response['error']))
148
149        self.log.info('Verifying DUT country code was correctly set to %s.' %
150                      country_code)
151        phy_ids_response = self.device.wlan_lib.wlanPhyIdList()
152        if phy_ids_response.get('error'):
153            raise ConnectionError('Failed to get phy ids from DUT. Error: %s' %
154                                  (country_code, phy_ids_response['error']))
155
156        end_time = time.time() + TIME_TO_WAIT_FOR_COUNTRY_CODE
157        while time.time() < end_time:
158            for id in phy_ids_response['result']:
159                get_country_response = self.device.wlan_lib.wlanGetCountry(id)
160                if get_country_response.get('error'):
161                    raise ConnectionError(
162                        'Failed to query PHY ID (%s) for country. Error: %s' %
163                        (id, get_country_response['error']))
164
165                set_code = ''.join([
166                    chr(ascii_char)
167                    for ascii_char in get_country_response['result']
168                ])
169                if set_code != country_code:
170                    self.log.debug(
171                        'PHY (id: %s) has incorrect country code set. '
172                        'Expected: %s, Got: %s' % (id, country_code, set_code))
173                    break
174            else:
175                self.log.info('All PHYs have expected country code (%s)' %
176                              country_code)
177                break
178            time.sleep(TIME_TO_SLEEP_BETWEEN_RETRIES)
179        else:
180            raise EnvironmentError('Failed to set DUT country code to %s.' %
181                                   country_code)
182