1"""Controller for Open WRT access point.""" 2 3import random 4import re 5import time 6 7from acts import logger 8from acts import signals 9from acts.controllers.ap_lib import hostapd_constants 10from acts.controllers.openwrt_lib import network_settings 11from acts.controllers.openwrt_lib import wireless_config 12from acts.controllers.openwrt_lib import wireless_settings_applier 13from acts.controllers.openwrt_lib.openwrt_constants import OpenWrtModelMap as modelmap 14from acts.controllers.openwrt_lib.openwrt_constants import OpenWrtWifiSetting 15from acts.controllers.openwrt_lib.openwrt_constants import SYSTEM_INFO_CMD 16from acts.controllers.utils_lib.ssh import connection 17from acts.controllers.utils_lib.ssh import settings 18import yaml 19 20 21MOBLY_CONTROLLER_CONFIG_NAME = "OpenWrtAP" 22ACTS_CONTROLLER_REFERENCE_NAME = "access_points" 23OPEN_SECURITY = "none" 24PSK1_SECURITY = "psk" 25PSK_SECURITY = "psk2" 26WEP_SECURITY = "wep" 27ENT_SECURITY = "wpa2" 28OWE_SECURITY = "owe" 29SAE_SECURITY = "sae" 30SAEMIXED_SECURITY = "sae-mixed" 31ENABLE_RADIO = "0" 32PMF_ENABLED = 2 33WIFI_2G = "wifi2g" 34WIFI_5G = "wifi5g" 35WAIT_TIME = 20 36DEFAULT_RADIOS = ("radio0", "radio1") 37 38 39def create(configs): 40 """Creates ap controllers from a json config. 41 42 Creates an ap controller from either a list, or a single element. The element 43 can either be just the hostname or a dictionary containing the hostname and 44 username of the AP to connect to over SSH. 45 46 Args: 47 configs: The json configs that represent this controller. 48 49 Returns: 50 AccessPoint object 51 52 Example: 53 Below is the config file entry for OpenWrtAP as a list. A testbed can have 54 1 or more APs to configure. Each AP has a "ssh_config" key to provide SSH 55 login information. OpenWrtAP#__init__() uses this to create SSH object. 56 57 "OpenWrtAP": [ 58 { 59 "ssh_config": { 60 "user" : "root", 61 "host" : "192.168.1.1" 62 } 63 }, 64 { 65 "ssh_config": { 66 "user" : "root", 67 "host" : "192.168.1.2" 68 } 69 } 70 ] 71 """ 72 return [OpenWrtAP(c) for c in configs] 73 74 75def destroy(aps): 76 """Destroys a list of AccessPoints. 77 78 Args: 79 aps: The list of AccessPoints to destroy. 80 """ 81 for ap in aps: 82 ap.close() 83 ap.close_ssh() 84 85 86def get_info(aps): 87 """Get information on a list of access points. 88 89 Args: 90 aps: A list of AccessPoints. 91 92 Returns: 93 A list of all aps hostname. 94 """ 95 return [ap.ssh_settings.hostname for ap in aps] 96 97 98class OpenWrtAP(object): 99 """An AccessPoint controller. 100 101 Attributes: 102 ssh: The ssh connection to the AP. 103 ssh_settings: The ssh settings being used by the ssh connection. 104 log: Logging object for AccessPoint. 105 wireless_setting: object holding wireless configuration. 106 network_setting: Object for network configuration. 107 model: OpenWrt HW model. 108 radios: Fit interface for test. 109 """ 110 111 def __init__(self, config): 112 """Initialize AP.""" 113 self.ssh_settings = settings.from_config(config["ssh_config"]) 114 self.ssh = connection.SshConnection(self.ssh_settings) 115 self.log = logger.create_logger( 116 lambda msg: "[OpenWrtAP|%s] %s" % (self.ssh_settings.hostname, msg)) 117 self.wireless_setting = None 118 self.network_setting = network_settings.NetworkSettings( 119 self.ssh, self.ssh_settings, self.log) 120 self.model = self.get_model_name() 121 if self.model in modelmap.__dict__: 122 self.radios = modelmap.__dict__[self.model] 123 else: 124 self.radios = DEFAULT_RADIOS 125 126 def configure_ap(self, wifi_configs, channel_2g, channel_5g): 127 """Configure AP with the required settings. 128 129 Each test class inherits WifiBaseTest. Based on the test, we may need to 130 configure PSK, WEP, OPEN, ENT networks on 2G and 5G bands in any 131 combination. We call WifiBaseTest methods get_psk_network(), 132 get_open_network(), get_wep_network() and get_ent_network() to create 133 dictionaries which contains this information. 'wifi_configs' is a list of 134 such dictionaries. Example below configures 2 WiFi networks - 1 PSK 2G and 135 1 Open 5G on one AP. configure_ap() is called from WifiBaseTest to 136 configure the APs. 137 138 wifi_configs = [ 139 { 140 '2g': { 141 'SSID': '2g_AkqXWPK4', 142 'security': 'psk2', 143 'password': 'YgYuXqDO9H', 144 'hiddenSSID': False 145 }, 146 }, 147 { 148 '5g': { 149 'SSID': '5g_8IcMR1Sg', 150 'security': 'none', 151 'hiddenSSID': False 152 }, 153 } 154 ] 155 156 Args: 157 wifi_configs: list of network settings for 2G and 5G bands. 158 channel_2g: channel for 2G band. 159 channel_5g: channel for 5G band. 160 """ 161 # generate wifi configs to configure 162 wireless_configs = self.generate_wireless_configs(wifi_configs) 163 self.wireless_setting = wireless_settings_applier.WirelessSettingsApplier( 164 self.ssh, wireless_configs, channel_2g, channel_5g, self.radios[1], self.radios[0]) 165 self.wireless_setting.apply_wireless_settings() 166 167 def start_ap(self): 168 """Starts the AP with the settings in /etc/config/wireless.""" 169 self.ssh.run("wifi up") 170 curr_time = time.time() 171 while time.time() < curr_time + WAIT_TIME: 172 if self.get_wifi_status(): 173 return 174 time.sleep(3) 175 if not self.get_wifi_status(): 176 raise ValueError("Failed to turn on WiFi on the AP.") 177 178 def stop_ap(self): 179 """Stops the AP.""" 180 self.ssh.run("wifi down") 181 curr_time = time.time() 182 while time.time() < curr_time + WAIT_TIME: 183 if not self.get_wifi_status(): 184 return 185 time.sleep(3) 186 if self.get_wifi_status(): 187 raise ValueError("Failed to turn off WiFi on the AP.") 188 189 def get_bssids_for_wifi_networks(self): 190 """Get BSSIDs for wifi networks configured. 191 192 Returns: 193 Dictionary of SSID - BSSID map for both bands. 194 """ 195 bssid_map = {"2g": {}, "5g": {}} 196 for radio in self.radios: 197 ssid_ifname_map = self.get_ifnames_for_ssids(radio) 198 if radio == self.radios[0]: 199 for ssid, ifname in ssid_ifname_map.items(): 200 bssid_map["5g"][ssid] = self.get_bssid(ifname) 201 elif radio == self.radios[1]: 202 for ssid, ifname in ssid_ifname_map.items(): 203 bssid_map["2g"][ssid] = self.get_bssid(ifname) 204 return bssid_map 205 206 def get_ifnames_for_ssids(self, radio): 207 """Get interfaces for wifi networks. 208 209 Args: 210 radio: 2g or 5g radio get the bssids from. 211 212 Returns: 213 dictionary of ssid - ifname mappings. 214 """ 215 ssid_ifname_map = {} 216 str_output = self.ssh.run("wifi status %s" % radio).stdout 217 wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""), 218 Loader=yaml.SafeLoader) 219 wifi_status = wifi_status[radio] 220 if wifi_status["up"]: 221 interfaces = wifi_status["interfaces"] 222 for config in interfaces: 223 ssid = config["config"]["ssid"] 224 ifname = config["ifname"] 225 ssid_ifname_map[ssid] = ifname 226 return ssid_ifname_map 227 228 def get_bssid(self, ifname): 229 """Get MAC address from an interface. 230 231 Args: 232 ifname: interface name of the corresponding MAC. 233 234 Returns: 235 BSSID of the interface. 236 """ 237 ifconfig = self.ssh.run("ifconfig %s" % ifname).stdout 238 mac_addr = ifconfig.split("\n")[0].split()[-1] 239 return mac_addr 240 241 def set_wpa_encryption(self, encryption): 242 """Set different encryptions to wpa or wpa2. 243 244 Args: 245 encryption: ccmp, tkip, or ccmp+tkip. 246 """ 247 str_output = self.ssh.run("wifi status").stdout 248 wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""), 249 Loader=yaml.SafeLoader) 250 251 # Counting how many interface are enabled. 252 total_interface = 0 253 for radio in self.radios: 254 num_interface = len(wifi_status[radio]["interfaces"]) 255 total_interface += num_interface 256 257 # Iterates every interface to get and set wpa encryption. 258 default_extra_interface = 2 259 for i in range(total_interface + default_extra_interface): 260 origin_encryption = self.ssh.run( 261 "uci get wireless.@wifi-iface[{}].encryption".format(i)).stdout 262 origin_psk_pattern = re.match(r"psk\b", origin_encryption) 263 target_psk_pattern = re.match(r"psk\b", encryption) 264 origin_psk2_pattern = re.match(r"psk2\b", origin_encryption) 265 target_psk2_pattern = re.match(r"psk2\b", encryption) 266 267 if origin_psk_pattern == target_psk_pattern: 268 self.ssh.run( 269 "uci set wireless.@wifi-iface[{}].encryption={}".format( 270 i, encryption)) 271 272 if origin_psk2_pattern == target_psk2_pattern: 273 self.ssh.run( 274 "uci set wireless.@wifi-iface[{}].encryption={}".format( 275 i, encryption)) 276 277 self.ssh.run("uci commit wireless") 278 self.ssh.run("wifi") 279 280 def set_password(self, pwd_5g=None, pwd_2g=None): 281 """Set password for individual interface. 282 283 Args: 284 pwd_5g: 8 ~ 63 chars, ascii letters and digits password for 5g network. 285 pwd_2g: 8 ~ 63 chars, ascii letters and digits password for 2g network. 286 """ 287 if pwd_5g: 288 if len(pwd_5g) < 8 or len(pwd_5g) > 63: 289 self.log.error("Password must be 8~63 characters long") 290 # Only accept ascii letters and digits 291 elif not re.match("^[A-Za-z0-9]*$", pwd_5g): 292 self.log.error("Password must only contains ascii letters and digits") 293 else: 294 self.ssh.run( 295 "uci set wireless.@wifi-iface[{}].key={}".format(3, pwd_5g)) 296 self.log.info("Set 5G password to :{}".format(pwd_5g)) 297 298 if pwd_2g: 299 if len(pwd_2g) < 8 or len(pwd_2g) > 63: 300 self.log.error("Password must be 8~63 characters long") 301 # Only accept ascii letters and digits 302 elif not re.match("^[A-Za-z0-9]*$", pwd_2g): 303 self.log.error("Password must only contains ascii letters and digits") 304 else: 305 self.ssh.run( 306 "uci set wireless.@wifi-iface[{}].key={}".format(2, pwd_2g)) 307 self.log.info("Set 2G password to :{}".format(pwd_2g)) 308 309 self.ssh.run("uci commit wireless") 310 self.ssh.run("wifi") 311 312 def set_ssid(self, ssid_5g=None, ssid_2g=None): 313 """Set SSID for individual interface. 314 315 Args: 316 ssid_5g: 8 ~ 63 chars for 5g network. 317 ssid_2g: 8 ~ 63 chars for 2g network. 318 """ 319 if ssid_5g: 320 if len(ssid_5g) < 8 or len(ssid_5g) > 63: 321 self.log.error("SSID must be 8~63 characters long") 322 # Only accept ascii letters and digits 323 else: 324 self.ssh.run( 325 "uci set wireless.@wifi-iface[{}].ssid={}".format(3, ssid_5g)) 326 self.log.info("Set 5G SSID to :{}".format(ssid_5g)) 327 328 if ssid_2g: 329 if len(ssid_2g) < 8 or len(ssid_2g) > 63: 330 self.log.error("SSID must be 8~63 characters long") 331 # Only accept ascii letters and digits 332 else: 333 self.ssh.run( 334 "uci set wireless.@wifi-iface[{}].ssid={}".format(2, ssid_2g)) 335 self.log.info("Set 2G SSID to :{}".format(ssid_2g)) 336 337 self.ssh.run("uci commit wireless") 338 self.ssh.run("wifi") 339 340 def generate_mobility_domain(self): 341 """Generate 4-character hexadecimal ID. 342 343 Returns: 344 String; a 4-character hexadecimal ID. 345 """ 346 md = "{:04x}".format(random.getrandbits(16)) 347 self.log.info("Mobility Domain ID: {}".format(md)) 348 return md 349 350 def enable_80211r(self, iface, md): 351 """Enable 802.11r for one single radio. 352 353 Args: 354 iface: index number of wifi-iface. 355 2: radio1 356 3: radio0 357 md: mobility domain. a 4-character hexadecimal ID. 358 Raises: 359 TestSkip if 2g or 5g radio is not up or 802.11r is not enabled. 360 """ 361 str_output = self.ssh.run("wifi status").stdout 362 wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""), 363 Loader=yaml.SafeLoader) 364 # Check if the radio is up. 365 if iface == OpenWrtWifiSetting.IFACE_2G: 366 if wifi_status[self.radios[1]]["up"]: 367 self.log.info("2g network is ENABLED") 368 else: 369 raise signals.TestSkip("2g network is NOT ENABLED") 370 elif iface == OpenWrtWifiSetting.IFACE_5G: 371 if wifi_status[self.radios[0]]["up"]: 372 self.log.info("5g network is ENABLED") 373 else: 374 raise signals.TestSkip("5g network is NOT ENABLED") 375 376 # Setup 802.11r. 377 self.ssh.run( 378 "uci set wireless.@wifi-iface[{}].ieee80211r='1'".format(iface)) 379 self.ssh.run( 380 "uci set wireless.@wifi-iface[{}].ft_psk_generate_local='1'" 381 .format(iface)) 382 self.ssh.run( 383 "uci set wireless.@wifi-iface[{}].mobility_domain='{}'" 384 .format(iface, md)) 385 self.ssh.run( 386 "uci commit wireless") 387 self.ssh.run("wifi") 388 389 # Check if 802.11r is enabled. 390 result = self.ssh.run( 391 "uci get wireless.@wifi-iface[{}].ieee80211r".format(iface)).stdout 392 if result == "1": 393 self.log.info("802.11r is ENABLED") 394 else: 395 raise signals.TestSkip("802.11r is NOT ENABLED") 396 397 def generate_wireless_configs(self, wifi_configs): 398 """Generate wireless configs to configure. 399 400 Converts wifi_configs from configure_ap() to a list of 'WirelessConfig' 401 objects. Each object represents a wifi network to configure on the AP. 402 403 Args: 404 wifi_configs: Network list of different security types and bands. 405 406 Returns: 407 wireless configuration for openwrt AP. 408 """ 409 num_2g = 1 410 num_5g = 1 411 wireless_configs = [] 412 413 for i in range(len(wifi_configs)): 414 if hostapd_constants.BAND_2G in wifi_configs[i]: 415 config = wifi_configs[i][hostapd_constants.BAND_2G] 416 if config["security"] == PSK_SECURITY: 417 wireless_configs.append( 418 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 419 config["SSID"], 420 config["security"], 421 hostapd_constants.BAND_2G, 422 password=config["password"], 423 hidden=config["hiddenSSID"], 424 ieee80211w=config["ieee80211w"])) 425 elif config["security"] == PSK1_SECURITY: 426 wireless_configs.append( 427 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 428 config["SSID"], 429 config["security"], 430 hostapd_constants.BAND_2G, 431 password=config["password"], 432 hidden=config["hiddenSSID"], 433 ieee80211w=config["ieee80211w"])) 434 elif config["security"] == WEP_SECURITY: 435 wireless_configs.append( 436 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 437 config["SSID"], 438 config["security"], 439 hostapd_constants.BAND_2G, 440 wep_key=config["wepKeys"][0], 441 hidden=config["hiddenSSID"])) 442 elif config["security"] == OPEN_SECURITY: 443 wireless_configs.append( 444 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 445 config["SSID"], 446 config["security"], 447 hostapd_constants.BAND_2G, 448 hidden=config["hiddenSSID"])) 449 elif config["security"] == OWE_SECURITY: 450 wireless_configs.append( 451 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 452 config["SSID"], 453 config["security"], 454 hostapd_constants.BAND_2G, 455 hidden=config["hiddenSSID"], 456 ieee80211w=PMF_ENABLED)) 457 elif config["security"] == SAE_SECURITY: 458 wireless_configs.append( 459 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 460 config["SSID"], 461 config["security"], 462 hostapd_constants.BAND_2G, 463 password=config["password"], 464 hidden=config["hiddenSSID"], 465 ieee80211w=PMF_ENABLED)) 466 elif config["security"] == SAEMIXED_SECURITY: 467 wireless_configs.append( 468 wireless_config.WirelessConfig("%s%s" % (WIFI_2G, num_2g), 469 config["SSID"], 470 config["security"], 471 hostapd_constants.BAND_2G, 472 password=config["password"], 473 hidden=config["hiddenSSID"], 474 ieee80211w=config["ieee80211w"])) 475 elif config["security"] == ENT_SECURITY: 476 wireless_configs.append( 477 wireless_config.WirelessConfig( 478 "%s%s" % (WIFI_2G, num_2g), 479 config["SSID"], 480 config["security"], 481 hostapd_constants.BAND_2G, 482 radius_server_ip=config["radius_server_ip"], 483 radius_server_port=config["radius_server_port"], 484 radius_server_secret=config["radius_server_secret"], 485 hidden=config["hiddenSSID"])) 486 num_2g += 1 487 if hostapd_constants.BAND_5G in wifi_configs[i]: 488 config = wifi_configs[i][hostapd_constants.BAND_5G] 489 if config["security"] == PSK_SECURITY: 490 wireless_configs.append( 491 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 492 config["SSID"], 493 config["security"], 494 hostapd_constants.BAND_5G, 495 password=config["password"], 496 hidden=config["hiddenSSID"], 497 ieee80211w=config["ieee80211w"])) 498 elif config["security"] == PSK1_SECURITY: 499 wireless_configs.append( 500 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 501 config["SSID"], 502 config["security"], 503 hostapd_constants.BAND_5G, 504 password=config["password"], 505 hidden=config["hiddenSSID"], 506 ieee80211w=config["ieee80211w"])) 507 elif config["security"] == WEP_SECURITY: 508 wireless_configs.append( 509 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 510 config["SSID"], 511 config["security"], 512 hostapd_constants.BAND_5G, 513 wep_key=config["wepKeys"][0], 514 hidden=config["hiddenSSID"])) 515 elif config["security"] == OPEN_SECURITY: 516 wireless_configs.append( 517 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 518 config["SSID"], 519 config["security"], 520 hostapd_constants.BAND_5G, 521 hidden=config["hiddenSSID"])) 522 elif config["security"] == OWE_SECURITY: 523 wireless_configs.append( 524 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 525 config["SSID"], 526 config["security"], 527 hostapd_constants.BAND_5G, 528 hidden=config["hiddenSSID"], 529 ieee80211w=PMF_ENABLED)) 530 elif config["security"] == SAE_SECURITY: 531 wireless_configs.append( 532 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 533 config["SSID"], 534 config["security"], 535 hostapd_constants.BAND_5G, 536 password=config["password"], 537 hidden=config["hiddenSSID"], 538 ieee80211w=PMF_ENABLED)) 539 elif config["security"] == SAEMIXED_SECURITY: 540 wireless_configs.append( 541 wireless_config.WirelessConfig("%s%s" % (WIFI_5G, num_5g), 542 config["SSID"], 543 config["security"], 544 hostapd_constants.BAND_5G, 545 password=config["password"], 546 hidden=config["hiddenSSID"], 547 ieee80211w=config["ieee80211w"])) 548 elif config["security"] == ENT_SECURITY: 549 wireless_configs.append( 550 wireless_config.WirelessConfig( 551 "%s%s" % (WIFI_5G, num_5g), 552 config["SSID"], 553 config["security"], 554 hostapd_constants.BAND_5G, 555 radius_server_ip=config["radius_server_ip"], 556 radius_server_port=config["radius_server_port"], 557 radius_server_secret=config["radius_server_secret"], 558 hidden=config["hiddenSSID"])) 559 num_5g += 1 560 561 return wireless_configs 562 563 def get_wifi_network(self, security=None, band=None): 564 """Return first match wifi interface's config. 565 566 Args: 567 security: psk2 or none 568 band: '2g' or '5g' 569 570 Returns: 571 A dict contains match wifi interface's config. 572 """ 573 574 for wifi_iface in self.wireless_setting.wireless_configs: 575 match_list = [] 576 wifi_network = wifi_iface.__dict__ 577 if security: 578 match_list.append(security == wifi_network["security"]) 579 if band: 580 match_list.append(band == wifi_network["band"]) 581 582 if all(match_list): 583 wifi_network["SSID"] = wifi_network["ssid"] 584 if not wifi_network["password"]: 585 del wifi_network["password"] 586 return wifi_network 587 return None 588 589 def get_wifi_status(self): 590 """Check if radios are up. Default are 2G and 5G bands. 591 592 Returns: 593 True if both radios are up. False if not. 594 """ 595 status = True 596 for radio in self.radios: 597 try: 598 str_output = self.ssh.run("wifi status %s" % radio).stdout 599 wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""), 600 Loader=yaml.SafeLoader) 601 status = wifi_status[radio]["up"] and status 602 except: 603 self.log.info("Failed to make ssh connection to the OpenWrt") 604 return False 605 return status 606 607 def verify_wifi_status(self, timeout=20): 608 """Ensure wifi interfaces are ready. 609 610 Args: 611 timeout: An integer that is the number of times to try 612 wait for interface ready. 613 Returns: 614 True if both radios are up. False if not. 615 """ 616 start_time = time.time() 617 end_time = start_time + timeout 618 while time.time() < end_time: 619 if self.get_wifi_status(): 620 return True 621 time.sleep(1) 622 return False 623 624 def get_model_name(self): 625 """Get Openwrt model name. 626 627 Returns: 628 A string include device brand and model. e.g. NETGEAR_R8000 629 """ 630 out = self.ssh.run(SYSTEM_INFO_CMD).stdout.split("\n") 631 for line in out: 632 if "board_name" in line: 633 model = (line.split()[1].strip("\",").split(",")) 634 return "_".join(map(lambda i: i.upper(), model)) 635 self.log.info("Failed to retrieve OpenWrt model information.") 636 return None 637 638 def close(self): 639 """Reset wireless and network settings to default and stop AP.""" 640 if self.network_setting.config: 641 self.network_setting.cleanup_network_settings() 642 if self.wireless_setting: 643 self.wireless_setting.cleanup_wireless_settings() 644 645 def close_ssh(self): 646 """Close SSH connection to AP.""" 647 self.ssh.close() 648 649 def reboot(self): 650 """Reboot Openwrt.""" 651 self.ssh.run("reboot") 652 653