1"""Controller for Open WRT access point.""" 2 3import re 4import time 5from acts import logger 6from acts.controllers.ap_lib import hostapd_constants 7from acts.controllers.openwrt_lib import network_settings 8from acts.controllers.openwrt_lib import wireless_config 9from acts.controllers.openwrt_lib import wireless_settings_applier 10from acts.controllers.utils_lib.ssh import connection 11from acts.controllers.utils_lib.ssh import settings 12import yaml 13 14MOBLY_CONTROLLER_CONFIG_NAME = "OpenWrtAP" 15ACTS_CONTROLLER_REFERENCE_NAME = "access_points" 16OPEN_SECURITY = "none" 17PSK1_SECURITY = 'psk' 18PSK_SECURITY = "psk2" 19WEP_SECURITY = "wep" 20ENT_SECURITY = "wpa2" 21OWE_SECURITY = "owe" 22SAE_SECURITY = "sae" 23ENABLE_RADIO = "0" 24PMF_ENABLED = 2 25WIFI_2G = "wifi2g" 26WIFI_5G = "wifi5g" 27WAIT_TIME = 20 28 29 30def create(configs): 31 """Creates ap controllers from a json config. 32 33 Creates an ap controller from either a list, or a single element. The element 34 can either be just the hostname or a dictionary containing the hostname and 35 username of the AP to connect to over SSH. 36 37 Args: 38 configs: The json configs that represent this controller. 39 40 Returns: 41 AccessPoint object 42 43 Example: 44 Below is the config file entry for OpenWrtAP as a list. A testbed can have 45 1 or more APs to configure. Each AP has a "ssh_config" key to provide SSH 46 login information. OpenWrtAP#__init__() uses this to create SSH object. 47 48 "OpenWrtAP": [ 49 { 50 "ssh_config": { 51 "user" : "root", 52 "host" : "192.168.1.1" 53 } 54 }, 55 { 56 "ssh_config": { 57 "user" : "root", 58 "host" : "192.168.1.2" 59 } 60 } 61 ] 62 """ 63 return [OpenWrtAP(c) for c in configs] 64 65 66def destroy(aps): 67 """Destroys a list of AccessPoints. 68 69 Args: 70 aps: The list of AccessPoints to destroy. 71 """ 72 for ap in aps: 73 ap.close() 74 ap.close_ssh() 75 76 77def get_info(aps): 78 """Get information on a list of access points. 79 80 Args: 81 aps: A list of AccessPoints. 82 83 Returns: 84 A list of all aps hostname. 85 """ 86 return [ap.ssh_settings.hostname for ap in aps] 87 88 89class OpenWrtAP(object): 90 """An AccessPoint controller. 91 92 Attributes: 93 ssh: The ssh connection to the AP. 94 ssh_settings: The ssh settings being used by the ssh connection. 95 log: Logging object for AccessPoint. 96 wireless_setting: object holding wireless configuration. 97 network_setting: Object for network configuration 98 """ 99 100 def __init__(self, config): 101 """Initialize AP.""" 102 self.ssh_settings = settings.from_config(config["ssh_config"]) 103 self.ssh = connection.SshConnection(self.ssh_settings) 104 self.log = logger.create_logger( 105 lambda msg: "[OpenWrtAP|%s] %s" % (self.ssh_settings.hostname, msg)) 106 self.wireless_setting = None 107 self.network_setting = network_settings.NetworkSettings( 108 self.ssh, config["ssh_config"]["host"], self.log) 109 110 def configure_ap(self, wifi_configs, channel_2g, channel_5g): 111 """Configure AP with the required settings. 112 113 Each test class inherits WifiBaseTest. Based on the test, we may need to 114 configure PSK, WEP, OPEN, ENT networks on 2G and 5G bands in any 115 combination. We call WifiBaseTest methods get_psk_network(), 116 get_open_network(), get_wep_network() and get_ent_network() to create 117 dictionaries which contains this information. 'wifi_configs' is a list of 118 such dictionaries. Example below configures 2 WiFi networks - 1 PSK 2G and 119 1 Open 5G on one AP. configure_ap() is called from WifiBaseTest to 120 configure the APs. 121 122 wifi_configs = [ 123 { 124 '2g': { 125 'SSID': '2g_AkqXWPK4', 126 'security': 'psk2', 127 'password': 'YgYuXqDO9H', 128 'hiddenSSID': False 129 }, 130 }, 131 { 132 '5g': { 133 'SSID': '5g_8IcMR1Sg', 134 'security': 'none', 135 'hiddenSSID': False 136 }, 137 } 138 ] 139 140 Args: 141 wifi_configs: list of network settings for 2G and 5G bands. 142 channel_2g: channel for 2G band. 143 channel_5g: channel for 5G band. 144 """ 145 # generate wifi configs to configure 146 wireless_configs = self.generate_wireless_configs(wifi_configs) 147 self.wireless_setting = wireless_settings_applier.WirelessSettingsApplier( 148 self.ssh, wireless_configs, channel_2g, channel_5g) 149 self.wireless_setting.apply_wireless_settings() 150 151 def start_ap(self): 152 """Starts the AP with the settings in /etc/config/wireless.""" 153 self.ssh.run("wifi up") 154 curr_time = time.time() 155 while time.time() < curr_time + WAIT_TIME: 156 if self.get_wifi_status(): 157 return 158 time.sleep(3) 159 if not self.get_wifi_status(): 160 raise ValueError("Failed to turn on WiFi on the AP.") 161 162 def stop_ap(self): 163 """Stops the AP.""" 164 self.ssh.run("wifi down") 165 curr_time = time.time() 166 while time.time() < curr_time + WAIT_TIME: 167 if not self.get_wifi_status(): 168 return 169 time.sleep(3) 170 if self.get_wifi_status(): 171 raise ValueError("Failed to turn off WiFi on the AP.") 172 173 def get_bssids_for_wifi_networks(self): 174 """Get BSSIDs for wifi networks configured. 175 176 Returns: 177 Dictionary of SSID - BSSID map for both bands. 178 """ 179 bssid_map = {"2g": {}, "5g": {}} 180 for radio in ["radio0", "radio1"]: 181 ssid_ifname_map = self.get_ifnames_for_ssids(radio) 182 if radio == "radio0": 183 for ssid, ifname in ssid_ifname_map.items(): 184 bssid_map["5g"][ssid] = self.get_bssid(ifname) 185 elif radio == "radio1": 186 for ssid, ifname in ssid_ifname_map.items(): 187 bssid_map["2g"][ssid] = self.get_bssid(ifname) 188 return bssid_map 189 190 def get_wifi_status(self): 191 """Check if radios are up for both 2G and 5G bands. 192 193 Returns: 194 True if both radios are up. False if not. 195 """ 196 radios = ["radio0", "radio1"] 197 status = True 198 for radio in radios: 199 str_output = self.ssh.run("wifi status %s" % radio).stdout 200 wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""), 201 Loader=yaml.FullLoader) 202 status = wifi_status[radio]["up"] and status 203 return status 204 205 def get_ifnames_for_ssids(self, radio): 206 """Get interfaces for wifi networks. 207 208 Args: 209 radio: 2g or 5g radio get the bssids from. 210 211 Returns: 212 dictionary of ssid - ifname mappings. 213 """ 214 ssid_ifname_map = {} 215 str_output = self.ssh.run("wifi status %s" % radio).stdout 216 wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""), 217 Loader=yaml.FullLoader) 218 wifi_status = wifi_status[radio] 219 if wifi_status["up"]: 220 interfaces = wifi_status["interfaces"] 221 for config in interfaces: 222 ssid = config["config"]["ssid"] 223 ifname = config["ifname"] 224 ssid_ifname_map[ssid] = ifname 225 return ssid_ifname_map 226 227 def get_bssid(self, ifname): 228 """Get MAC address from an interface. 229 230 Args: 231 ifname: interface name of the corresponding MAC. 232 233 Returns: 234 BSSID of the interface. 235 """ 236 ifconfig = self.ssh.run("ifconfig %s" % ifname).stdout 237 mac_addr = ifconfig.split("\n")[0].split()[-1] 238 return mac_addr 239 240 def set_wpa_encryption(self, encryption): 241 """Set different encryptions to wpa or wpa2. 242 243 Args: 244 encryption: ccmp, tkip, or ccmp+tkip. 245 """ 246 str_output = self.ssh.run("wifi status").stdout 247 wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""), 248 Loader=yaml.FullLoader) 249 250 # Counting how many interface are enabled. 251 total_interface = 0 252 for radio in ["radio0", "radio1"]: 253 num_interface = len(wifi_status[radio]['interfaces']) 254 total_interface += num_interface 255 256 # Iterates every interface to get and set wpa encryption. 257 default_extra_interface = 2 258 for i in range(total_interface + default_extra_interface): 259 origin_encryption = self.ssh.run( 260 'uci get wireless.@wifi-iface[{}].encryption'.format(i)).stdout 261 origin_psk_pattern = re.match(r'psk\b', origin_encryption) 262 target_psk_pattern = re.match(r'psk\b', encryption) 263 origin_psk2_pattern = re.match(r'psk2\b', origin_encryption) 264 target_psk2_pattern = re.match(r'psk2\b', encryption) 265 266 if origin_psk_pattern == target_psk_pattern: 267 self.ssh.run( 268 'uci set wireless.@wifi-iface[{}].encryption={}'.format( 269 i, encryption)) 270 271 if origin_psk2_pattern == target_psk2_pattern: 272 self.ssh.run( 273 'uci set wireless.@wifi-iface[{}].encryption={}'.format( 274 i, encryption)) 275 276 self.ssh.run("uci commit wireless") 277 self.ssh.run("wifi") 278 279 def set_password(self, pwd_5g=None, pwd_2g=None): 280 """Set password for individual interface. 281 282 Args: 283 pwd_5g: 8 ~ 63 chars, ascii letters and digits password for 5g network. 284 pwd_2g: 8 ~ 63 chars, ascii letters and digits password for 2g network. 285 """ 286 if pwd_5g: 287 if len(pwd_5g) < 8 or len(pwd_5g) > 63: 288 self.log.error("Password must be 8~63 characters long") 289 # Only accept ascii letters and digits 290 elif not re.match("^[A-Za-z0-9]*$", pwd_5g): 291 self.log.error("Password must only contains ascii letters and digits") 292 else: 293 self.ssh.run( 294 'uci set wireless.@wifi-iface[{}].key={}'.format(3, pwd_5g)) 295 self.log.info("Set 5G password to :{}".format(pwd_2g)) 296 297 if pwd_2g: 298 if len(pwd_2g) < 8 or len(pwd_2g) > 63: 299 self.log.error("Password must be 8~63 characters long") 300 # Only accept ascii letters and digits 301 elif not re.match("^[A-Za-z0-9]*$", pwd_2g): 302 self.log.error("Password must only contains ascii letters and digits") 303 else: 304 self.ssh.run( 305 'uci set wireless.@wifi-iface[{}].key={}'.format(2, pwd_2g)) 306 self.log.info("Set 2G password to :{}".format(pwd_2g)) 307 308 self.ssh.run("uci commit wireless") 309 self.ssh.run("wifi") 310 311 def generate_wireless_configs(self, wifi_configs): 312 """Generate wireless configs to configure. 313 314 Converts wifi_configs from configure_ap() to a list of 'WirelessConfig' 315 objects. Each object represents a wifi network to configure on the AP. 316 317 Args: 318 wifi_configs: Network list of different security types and bands. 319 320 Returns: 321 wireless configuration for openwrt AP. 322 """ 323 num_2g = 1 324 num_5g = 1 325 wireless_configs = [] 326 327 for i in range(len(wifi_configs)): 328 if hostapd_constants.BAND_2G in wifi_configs[i]: 329 config = wifi_configs[i][hostapd_constants.BAND_2G] 330 if config["security"] == PSK_SECURITY: 331 wireless_configs.append( 332 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 333 config["SSID"], 334 config["security"], 335 hostapd_constants.BAND_2G, 336 password=config["password"], 337 hidden=config["hiddenSSID"])) 338 elif config["security"] == PSK1_SECURITY: 339 wireless_configs.append( 340 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 341 config["SSID"], 342 config["security"], 343 hostapd_constants.BAND_2G, 344 password=config["password"], 345 hidden=config["hiddenSSID"])) 346 elif config["security"] == WEP_SECURITY: 347 wireless_configs.append( 348 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 349 config["SSID"], 350 config["security"], 351 hostapd_constants.BAND_2G, 352 wep_key=config["wepKeys"][0], 353 hidden=config["hiddenSSID"])) 354 elif config["security"] == OPEN_SECURITY: 355 wireless_configs.append( 356 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 357 config["SSID"], 358 config["security"], 359 hostapd_constants.BAND_2G, 360 hidden=config["hiddenSSID"])) 361 elif config["security"] == OWE_SECURITY: 362 wireless_configs.append( 363 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 364 config["SSID"], 365 config["security"], 366 hostapd_constants.BAND_2G, 367 hidden=config["hiddenSSID"], 368 ieee80211w=PMF_ENABLED)) 369 elif config["security"] == SAE_SECURITY: 370 wireless_configs.append( 371 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 372 config["SSID"], 373 config["security"], 374 hostapd_constants.BAND_2G, 375 password=config["password"], 376 hidden=config["hiddenSSID"], 377 ieee80211w=PMF_ENABLED)) 378 elif config["security"] == ENT_SECURITY: 379 wireless_configs.append( 380 wireless_config.WirelessConfig( 381 "%s%s" % (WIFI_2G, num_2g), 382 config["SSID"], 383 config["security"], 384 hostapd_constants.BAND_2G, 385 radius_server_ip=config["radius_server_ip"], 386 radius_server_port=config["radius_server_port"], 387 radius_server_secret=config["radius_server_secret"], 388 hidden=config["hiddenSSID"])) 389 num_2g += 1 390 if hostapd_constants.BAND_5G in wifi_configs[i]: 391 config = wifi_configs[i][hostapd_constants.BAND_5G] 392 if config["security"] == PSK_SECURITY: 393 wireless_configs.append( 394 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 395 config["SSID"], 396 config["security"], 397 hostapd_constants.BAND_5G, 398 password=config["password"], 399 hidden=config["hiddenSSID"])) 400 elif config["security"] == PSK1_SECURITY: 401 wireless_configs.append( 402 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 403 config["SSID"], 404 config["security"], 405 hostapd_constants.BAND_5G, 406 password=config["password"], 407 hidden=config["hiddenSSID"])) 408 elif config["security"] == WEP_SECURITY: 409 wireless_configs.append( 410 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 411 config["SSID"], 412 config["security"], 413 hostapd_constants.BAND_5G, 414 wep_key=config["wepKeys"][0], 415 hidden=config["hiddenSSID"])) 416 elif config["security"] == OPEN_SECURITY: 417 wireless_configs.append( 418 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 419 config["SSID"], 420 config["security"], 421 hostapd_constants.BAND_5G, 422 hidden=config["hiddenSSID"])) 423 elif config["security"] == OWE_SECURITY: 424 wireless_configs.append( 425 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 426 config["SSID"], 427 config["security"], 428 hostapd_constants.BAND_5G, 429 hidden=config["hiddenSSID"], 430 ieee80211w=PMF_ENABLED)) 431 elif config["security"] == SAE_SECURITY: 432 wireless_configs.append( 433 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 434 config["SSID"], 435 config["security"], 436 hostapd_constants.BAND_5G, 437 password=config["password"], 438 hidden=config["hiddenSSID"], 439 ieee80211w=PMF_ENABLED)) 440 elif config["security"] == ENT_SECURITY: 441 wireless_configs.append( 442 wireless_config.WirelessConfig( 443 "%s%s" % (WIFI_5G, num_5g), 444 config["SSID"], 445 config["security"], 446 hostapd_constants.BAND_5G, 447 radius_server_ip=config["radius_server_ip"], 448 radius_server_port=config["radius_server_port"], 449 radius_server_secret=config["radius_server_secret"], 450 hidden=config["hiddenSSID"])) 451 num_5g += 1 452 453 return wireless_configs 454 455 def get_wifi_network(self, security=None, band=None): 456 """Return first match wifi interface's config. 457 458 Args: 459 security: psk2 or none 460 band: '2g' or '5g' 461 462 Returns: 463 A dict contains match wifi interface's config. 464 """ 465 466 for wifi_iface in self.wireless_setting.wireless_configs: 467 match_list = [] 468 wifi_network = wifi_iface.__dict__ 469 if security: 470 match_list.append(security == wifi_network["security"]) 471 if band: 472 match_list.append(band == wifi_network["band"]) 473 474 if all(match_list): 475 wifi_network["SSID"] = wifi_network["ssid"] 476 if not wifi_network["password"]: 477 del wifi_network["password"] 478 return wifi_network 479 return None 480 481 def close(self): 482 """Reset wireless and network settings to default and stop AP.""" 483 if self.network_setting.config: 484 self.network_setting.cleanup_network_settings() 485 if self.wireless_setting: 486 self.wireless_setting.cleanup_wireless_settings() 487 488 def close_ssh(self): 489 """Close SSH connection to AP.""" 490 self.ssh.close() 491