• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import time
18
19from acts import logger
20from acts import signals
21from acts import utils
22
23TIME_TO_SLEEP_BETWEEN_RETRIES = 1
24TIME_TO_WAIT_FOR_COUNTRY_CODE = 10
25
26
27class WlanControllerError(signals.ControllerError):
28    pass
29
30
31class WlanController:
32    """Contains methods related to wlan core, to be used in FuchsiaDevice object"""
33    def __init__(self, fuchsia_device):
34        self.device = fuchsia_device
35        self.log = logger.create_tagged_trace_logger(
36            'WlanController for FuchsiaDevice | %s' % self.device.ip)
37
38    # TODO(70501): Wrap wlan_lib functions and setup from FuchsiaDevice here
39    # (similar to how WlanPolicyController does it) to prevent FuchsiaDevice
40    # from growing too large.
41    def _configure_wlan(self):
42        pass
43
44    def _deconfigure_wlan(self):
45        pass
46
47    def get_wlan_client_interface_id(self):
48        """ Returns the wlan interface id of the first found wlan client
49        interface.
50        """
51        # Retrieve wlan ifaces
52        response = self.device.wlan_lib.wlanGetIfaceIdList()
53        if response.get('error'):
54            raise WlanControllerError('Failed to get WLAN iface ids: %s' %
55                                      response['error'])
56
57        # If iface has role 'client', retunr id
58        iface_ids = response.get('result', [])
59        for id in iface_ids:
60            query_response = self.device.wlan_lib.wlanQueryInterface(id)
61            if query_response.get('error'):
62                raise WlanControllerError(
63                    'Failed to query wlan iface id %s: %s' %
64                    (id, query_response['error']))
65
66            if query_response['result'].get('role').lower() == 'client':
67                return id
68
69        return None
70
71    def get_wlan_interface_mac_addr_from_id(self, iface_id):
72        """ Retrieves the mac address of a wlan iface, using the wlan iface
73        id.
74
75        Args:
76            iface_id: int, wlan iface id
77
78        Returns:
79            string, mac address of wlan iface
80        """
81        query_response = self.device.wlan_lib.wlanQueryInterface(iface_id)
82        if query_response.get('error'):
83            raise WlanControllerError('Failed to query wlan iface id %s: %s' %
84                                      (iface_id, query_response['error']))
85        return utils.mac_address_list_to_str(
86            query_response['result'].get('mac_addr'))
87
88    def get_wlan_interface_name(self, mac_addr=None):
89        """ Retrieves name (netstack) of wlan interface using the mac address. If
90        mac address is not provided, returns the name of the first found wlan
91        client (as opposed to AP) interface.
92
93        Args:
94            mac_addr: optional, string or list of decimal octets representing
95                the mac addr of the wlan interface. e.g. "44:07:0b:50:c1:ef" or
96                [68, 7, 11, 80, 193, 239]
97
98        Returns:
99            string, name of wlan interface
100        """
101        # Default to first found client wlan interface
102        if not mac_addr:
103            client_iface_id = self.get_wlan_client_interface_id()
104            mac_addr = self.get_wlan_interface_mac_addr_from_id(
105                client_iface_id)
106
107        # Convert mac addr to list, for comparison
108        if type(mac_addr) == str:
109            mac_addr = utils.mac_address_str_to_list(mac_addr)
110
111        err = self.device.netstack_lib.init().get('error')
112        if err:
113            raise WlanControllerError('Failed to init netstack_lib: %s' % err)
114
115        # Retrieve net ifaces
116        response = self.device.netstack_lib.netstackListInterfaces()
117        if response.get('error'):
118            raise WlanControllerError(
119                'Failed to get network interfaces list: %s' %
120                response['error'])
121
122        # Find iface with matching mac addr, and return name
123        for iface_info in response['result']:
124            if iface_info['mac'] == mac_addr:
125                return iface_info['name']
126        return None
127
128    def set_country_code(self, country_code):
129        """Sets country code through the regulatory region service and waits
130        for the code to be applied to WLAN PHY.
131
132        Args:
133            country_code: string, the 2 character country code to set
134
135        Raises:
136            EnvironmentError - failure to get/set regulatory region
137            ConnectionError - failure to query PHYs
138        """
139        self.log.info('Setting DUT country code to %s' % country_code)
140        country_code_response = self.device.regulatory_region_lib.setRegion(
141            country_code)
142        if country_code_response.get('error'):
143            raise EnvironmentError(
144                'Failed to set country code (%s) on DUT. Error: %s' %
145                (country_code, country_code_response['error']))
146
147        self.log.info('Verifying DUT country code was correctly set to %s.' %
148                      country_code)
149        phy_ids_response = self.device.wlan_lib.wlanPhyIdList()
150        if phy_ids_response.get('error'):
151            raise ConnectionError('Failed to get phy ids from DUT. Error: %s' %
152                                  (country_code, phy_ids_response['error']))
153
154        end_time = time.time() + TIME_TO_WAIT_FOR_COUNTRY_CODE
155        while time.time() < end_time:
156            for id in phy_ids_response['result']:
157                get_country_response = self.device.wlan_lib.wlanGetCountry(id)
158                if get_country_response.get('error'):
159                    raise ConnectionError(
160                        'Failed to query PHY ID (%s) for country. Error: %s' %
161                        (id, get_country_response['error']))
162
163                set_code = ''.join([
164                    chr(ascii_char)
165                    for ascii_char in get_country_response['result']
166                ])
167                if set_code != country_code:
168                    self.log.debug(
169                        'PHY (id: %s) has incorrect country code set. '
170                        'Expected: %s, Got: %s' % (id, country_code, set_code))
171                    break
172            else:
173                self.log.info('All PHYs have expected country code (%s)' %
174                              country_code)
175                break
176            time.sleep(TIME_TO_SLEEP_BETWEEN_RETRIES)
177        else:
178            raise EnvironmentError('Failed to set DUT country code to %s.' %
179                                   country_code)
180