1#!/usr/bin/env python3 2# 3# Copyright 2017 - Google 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16""" 17 Base Class for Defining Common WiFi Test Functionality 18""" 19 20import copy 21import os 22import time 23 24from acts import asserts 25from acts import context 26from acts import signals 27from acts import utils 28from acts.base_test import BaseTestClass 29from acts.controllers.ap_lib import hostapd_ap_preset 30from acts.controllers.ap_lib import hostapd_bss_settings 31from acts.controllers.ap_lib import hostapd_constants 32from acts.controllers.ap_lib import hostapd_security 33from acts.keys import Config 34from acts_contrib.test_utils.net import net_test_utils as nutils 35from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 36 37from mobly.base_test import STAGE_NAME_TEARDOWN_CLASS 38 39WifiEnums = wutils.WifiEnums 40AP_1 = 0 41AP_2 = 1 42MAX_AP_COUNT = 2 43 44 45class WifiBaseTest(BaseTestClass): 46 def __init__(self, configs): 47 super().__init__(configs) 48 self.enable_packet_log = False 49 self.packet_log_2g = hostapd_constants.AP_DEFAULT_CHANNEL_2G 50 self.packet_log_5g = hostapd_constants.AP_DEFAULT_CHANNEL_5G 51 self.tcpdump_proc = [] 52 self.packet_log_pid = {} 53 54 def setup_class(self): 55 if hasattr(self, 'attenuators') and self.attenuators: 56 for attenuator in self.attenuators: 57 attenuator.set_atten(0) 58 opt_param = ["country_code_file"] 59 self.unpack_userparams(opt_param_names=opt_param) 60 if self.enable_packet_log and hasattr(self, "packet_capture"): 61 self.packet_logger = self.packet_capture[0] 62 self.packet_logger.configure_monitor_mode("2G", self.packet_log_2g) 63 self.packet_logger.configure_monitor_mode("5G", self.packet_log_5g) 64 if hasattr(self, "android_devices"): 65 for ad in self.android_devices: 66 wutils.wifi_test_device_init(ad) 67 if hasattr(self, "country_code_file"): 68 if isinstance(self.country_code_file, list): 69 self.country_code_file = self.country_code_file[0] 70 if not os.path.isfile(self.country_code_file): 71 self.country_code_file = os.path.join( 72 self.user_params[Config.key_config_path.value], 73 self.country_code_file) 74 self.country_code = utils.load_config( 75 self.country_code_file)["country"] 76 else: 77 self.country_code = WifiEnums.CountryCode.US 78 wutils.set_wifi_country_code(ad, self.country_code) 79 80 def setup_test(self): 81 if (hasattr(self, "android_devices")): 82 wutils.start_all_wlan_logs(self.android_devices) 83 self.tcpdump_proc = [] 84 if hasattr(self, "android_devices"): 85 for ad in self.android_devices: 86 proc = nutils.start_tcpdump(ad, self.test_name) 87 self.tcpdump_proc.append((ad, proc)) 88 if hasattr(self, "packet_logger"): 89 self.packet_log_pid = wutils.start_pcap(self.packet_logger, 'dual', 90 self.test_name) 91 92 def teardown_test(self): 93 if (hasattr(self, "android_devices")): 94 wutils.stop_all_wlan_logs(self.android_devices) 95 for proc in self.tcpdump_proc: 96 nutils.stop_tcpdump(proc[0], 97 proc[1], 98 self.test_name, 99 pull_dump=False) 100 self.tcpdump_proc = [] 101 if hasattr(self, "packet_logger") and self.packet_log_pid: 102 wutils.stop_pcap(self.packet_logger, 103 self.packet_log_pid, 104 test_status=True) 105 self.packet_log_pid = {} 106 107 def teardown_class(self): 108 begin_time = utils.get_current_epoch_time() 109 super().teardown_class() 110 for device in getattr(self, "fuchsia_devices", []): 111 device.take_bug_report(STAGE_NAME_TEARDOWN_CLASS, begin_time) 112 113 def on_fail(self, test_name, begin_time): 114 if hasattr(self, "android_devices"): 115 for ad in self.android_devices: 116 ad.take_bug_report(test_name, begin_time) 117 ad.cat_adb_log(test_name, begin_time) 118 wutils.get_ssrdumps(ad) 119 wutils.stop_all_wlan_logs(self.android_devices) 120 for ad in self.android_devices: 121 wutils.get_wlan_logs(ad) 122 for proc in self.tcpdump_proc: 123 nutils.stop_tcpdump(proc[0], proc[1], self.test_name) 124 self.tcpdump_proc = [] 125 if hasattr(self, "packet_logger") and self.packet_log_pid: 126 wutils.stop_pcap(self.packet_logger, 127 self.packet_log_pid, 128 test_status=False) 129 self.packet_log_pid = {} 130 131 # Gets a wlan_device log and calls the generic device fail on DUT. 132 for device in getattr(self, "fuchsia_devices", []): 133 self.on_device_fail(device, test_name, begin_time) 134 135 def on_device_fail(self, device, test_name, begin_time): 136 """Gets a generic device DUT bug report. 137 138 This method takes a bug report if the device has the 139 'take_bug_report_on_fail' config value, and if the flag is true. This 140 method also power cycles if 'hard_reboot_on_fail' is True. 141 142 Args: 143 device: Generic device to gather logs from. 144 test_name: Name of the test that triggered this function. 145 begin_time: Logline format timestamp taken when the test started. 146 """ 147 if (not hasattr(device, "take_bug_report_on_fail") 148 or device.take_bug_report_on_fail): 149 device.take_bug_report(test_name, begin_time) 150 151 if hasattr(device, 152 "hard_reboot_on_fail") and device.hard_reboot_on_fail: 153 device.reboot(reboot_type='hard', testbed_pdus=self.pdu_devices) 154 155 def download_ap_logs(self): 156 """Downloads the DHCP and hostapad logs from the access_point. 157 158 Using the current TestClassContext and TestCaseContext this method pulls 159 the DHCP and hostapd logs and outputs them to the correct path. 160 """ 161 current_path = context.get_current_context().get_full_output_path() 162 dhcp_full_out_path = os.path.join(current_path, "dhcp_log.txt") 163 164 dhcp_log = self.access_point.get_dhcp_logs() 165 if dhcp_log: 166 dhcp_log_file = open(dhcp_full_out_path, 'w') 167 dhcp_log_file.write(dhcp_log) 168 dhcp_log_file.close() 169 170 hostapd_logs = self.access_point.get_hostapd_logs() 171 for interface in hostapd_logs: 172 out_name = interface + "_hostapd_log.txt" 173 hostapd_full_out_path = os.path.join(current_path, out_name) 174 hostapd_log_file = open(hostapd_full_out_path, 'w') 175 hostapd_log_file.write(hostapd_logs[interface]) 176 hostapd_log_file.close() 177 178 def get_psk_network( 179 self, 180 mirror_ap, 181 reference_networks, 182 hidden=False, 183 same_ssid=False, 184 security_mode=hostapd_constants.WPA2_STRING, 185 ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 186 ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 187 passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, 188 passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G): 189 """Generates SSID and passphrase for a WPA2 network using random 190 generator. 191 192 Args: 193 mirror_ap: Boolean, determines if both APs use the same hostapd 194 config or different configs. 195 reference_networks: List of PSK networks. 196 same_ssid: Boolean, determines if both bands on AP use the same 197 SSID. 198 ssid_length_2gecond AP Int, number of characters to use for 2G SSID. 199 ssid_length_5g: Int, number of characters to use for 5G SSID. 200 passphrase_length_2g: Int, length of password for 2G network. 201 passphrase_length_5g: Int, length of password for 5G network. 202 203 Returns: A dict of 2G and 5G network lists for hostapd configuration. 204 205 """ 206 network_dict_2g = {} 207 network_dict_5g = {} 208 ref_5g_security = security_mode 209 ref_2g_security = security_mode 210 211 if same_ssid: 212 ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) 213 ref_5g_ssid = ref_2g_ssid 214 215 ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g) 216 ref_5g_passphrase = ref_2g_passphrase 217 218 else: 219 ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) 220 ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g) 221 222 ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) 223 ref_5g_passphrase = utils.rand_ascii_str(passphrase_length_5g) 224 225 network_dict_2g = { 226 "SSID": ref_2g_ssid, 227 "security": ref_2g_security, 228 "password": ref_2g_passphrase, 229 "hiddenSSID": hidden 230 } 231 232 network_dict_5g = { 233 "SSID": ref_5g_ssid, 234 "security": ref_5g_security, 235 "password": ref_5g_passphrase, 236 "hiddenSSID": hidden 237 } 238 239 ap = 0 240 for ap in range(MAX_AP_COUNT): 241 reference_networks.append({ 242 "2g": copy.copy(network_dict_2g), 243 "5g": copy.copy(network_dict_5g) 244 }) 245 if not mirror_ap: 246 break 247 return {"2g": network_dict_2g, "5g": network_dict_5g} 248 249 def get_open_network(self, 250 mirror_ap, 251 open_network, 252 hidden=False, 253 same_ssid=False, 254 ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 255 ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 256 security_mode='none'): 257 """Generates SSIDs for a open network using a random generator. 258 259 Args: 260 mirror_ap: Boolean, determines if both APs use the same hostapd 261 config or different configs. 262 open_network: List of open networks. 263 same_ssid: Boolean, determines if both bands on AP use the same 264 SSID. 265 ssid_length_2g: Int, number of characters to use for 2G SSID. 266 ssid_length_5g: Int, number of characters to use for 5G SSID. 267 security_mode: 'none' for open and 'OWE' for WPA3 OWE. 268 269 Returns: A dict of 2G and 5G network lists for hostapd configuration. 270 271 """ 272 network_dict_2g = {} 273 network_dict_5g = {} 274 275 if same_ssid: 276 open_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) 277 open_5g_ssid = open_2g_ssid 278 279 else: 280 open_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) 281 open_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) 282 283 network_dict_2g = { 284 "SSID": open_2g_ssid, 285 "security": security_mode, 286 "hiddenSSID": hidden 287 } 288 289 network_dict_5g = { 290 "SSID": open_5g_ssid, 291 "security": security_mode, 292 "hiddenSSID": hidden 293 } 294 295 ap = 0 296 for ap in range(MAX_AP_COUNT): 297 open_network.append({ 298 "2g": copy.copy(network_dict_2g), 299 "5g": copy.copy(network_dict_5g) 300 }) 301 if not mirror_ap: 302 break 303 return {"2g": network_dict_2g, "5g": network_dict_5g} 304 305 def get_wep_network( 306 self, 307 mirror_ap, 308 networks, 309 hidden=False, 310 same_ssid=False, 311 ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 312 ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 313 passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, 314 passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G): 315 """Generates SSID and passphrase for a WEP network using random 316 generator. 317 318 Args: 319 mirror_ap: Boolean, determines if both APs use the same hostapd 320 config or different configs. 321 networks: List of WEP networks. 322 same_ssid: Boolean, determines if both bands on AP use the same 323 SSID. 324 ssid_length_2gecond AP Int, number of characters to use for 2G SSID. 325 ssid_length_5g: Int, number of characters to use for 5G SSID. 326 passphrase_length_2g: Int, length of password for 2G network. 327 passphrase_length_5g: Int, length of password for 5G network. 328 329 Returns: A dict of 2G and 5G network lists for hostapd configuration. 330 331 """ 332 network_dict_2g = {} 333 network_dict_5g = {} 334 ref_5g_security = hostapd_constants.WEP_STRING 335 ref_2g_security = hostapd_constants.WEP_STRING 336 337 if same_ssid: 338 ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) 339 ref_5g_ssid = ref_2g_ssid 340 341 ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g) 342 ref_5g_passphrase = ref_2g_passphrase 343 344 else: 345 ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) 346 ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g) 347 348 ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) 349 ref_5g_passphrase = utils.rand_hex_str(passphrase_length_5g) 350 351 network_dict_2g = { 352 "SSID": ref_2g_ssid, 353 "security": ref_2g_security, 354 "wepKeys": [ref_2g_passphrase] * 4, 355 "hiddenSSID": hidden 356 } 357 358 network_dict_5g = { 359 "SSID": ref_5g_ssid, 360 "security": ref_5g_security, 361 "wepKeys": [ref_2g_passphrase] * 4, 362 "hiddenSSID": hidden 363 } 364 365 ap = 0 366 for ap in range(MAX_AP_COUNT): 367 networks.append({ 368 "2g": copy.copy(network_dict_2g), 369 "5g": copy.copy(network_dict_5g) 370 }) 371 if not mirror_ap: 372 break 373 return {"2g": network_dict_2g, "5g": network_dict_5g} 374 375 def update_bssid(self, ap_instance, ap, network, band): 376 """Get bssid and update network dictionary. 377 378 Args: 379 ap_instance: Accesspoint index that was configured. 380 ap: Accesspoint object corresponding to ap_instance. 381 network: Network dictionary. 382 band: Wifi networks' band. 383 384 """ 385 bssid = ap.get_bssid_from_ssid(network["SSID"], band) 386 387 if network["security"] == hostapd_constants.WPA2_STRING: 388 # TODO:(bamahadev) Change all occurances of reference_networks 389 # to wpa_networks. 390 self.reference_networks[ap_instance][band]["bssid"] = bssid 391 if network["security"] == hostapd_constants.WPA_STRING: 392 self.wpa_networks[ap_instance][band]["bssid"] = bssid 393 if network["security"] == hostapd_constants.WEP_STRING: 394 self.wep_networks[ap_instance][band]["bssid"] = bssid 395 if network["security"] == hostapd_constants.ENT_STRING: 396 if "bssid" not in self.ent_networks[ap_instance][band]: 397 self.ent_networks[ap_instance][band]["bssid"] = bssid 398 else: 399 self.ent_networks_pwd[ap_instance][band]["bssid"] = bssid 400 if network["security"] == 'none': 401 self.open_network[ap_instance][band]["bssid"] = bssid 402 403 def populate_bssid(self, ap_instance, ap, networks_5g, networks_2g): 404 """Get bssid for a given SSID and add it to the network dictionary. 405 406 Args: 407 ap_instance: Accesspoint index that was configured. 408 ap: Accesspoint object corresponding to ap_instance. 409 networks_5g: List of 5g networks configured on the APs. 410 networks_2g: List of 2g networks configured on the APs. 411 412 """ 413 414 if not (networks_5g or networks_2g): 415 return 416 417 for network in networks_5g: 418 if 'channel' in network: 419 continue 420 self.update_bssid(ap_instance, ap, network, 421 hostapd_constants.BAND_5G) 422 423 for network in networks_2g: 424 if 'channel' in network: 425 continue 426 self.update_bssid(ap_instance, ap, network, 427 hostapd_constants.BAND_2G) 428 429 def configure_openwrt_ap_and_start( 430 self, 431 channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G, 432 channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G, 433 channel_5g_ap2=None, 434 channel_2g_ap2=None, 435 ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 436 passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, 437 ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 438 passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G, 439 mirror_ap=False, 440 hidden=False, 441 same_ssid=False, 442 open_network=False, 443 wpa1_network=False, 444 wpa_network=False, 445 wep_network=False, 446 ent_network=False, 447 ent_network_pwd=False, 448 owe_network=False, 449 sae_network=False, 450 saemixed_network=False, 451 radius_conf_2g=None, 452 radius_conf_5g=None, 453 radius_conf_pwd=None, 454 ap_count=1, 455 ieee80211w=None): 456 """Create, configure and start OpenWrt AP. 457 458 Args: 459 channel_5g: 5G channel to configure. 460 channel_2g: 2G channel to configure. 461 channel_5g_ap2: 5G channel to configure on AP2. 462 channel_2g_ap2: 2G channel to configure on AP2. 463 ssid_length_2g: Int, number of characters to use for 2G SSID. 464 passphrase_length_2g: Int, length of password for 2G network. 465 ssid_length_5g: Int, number of characters to use for 5G SSID. 466 passphrase_length_5g: Int, length of password for 5G network. 467 same_ssid: Boolean, determines if both bands on AP use the same SSID. 468 open_network: Boolean, to check if open network should be configured. 469 wpa_network: Boolean, to check if wpa network should be configured. 470 wep_network: Boolean, to check if wep network should be configured. 471 ent_network: Boolean, to check if ent network should be configured. 472 ent_network_pwd: Boolean, to check if ent pwd network should be configured. 473 owe_network: Boolean, to check if owe network should be configured. 474 sae_network: Boolean, to check if sae network should be configured. 475 saemixed_network: Boolean, to check if saemixed network should be configured. 476 radius_conf_2g: dictionary with enterprise radius server details. 477 radius_conf_5g: dictionary with enterprise radius server details. 478 radius_conf_pwd: dictionary with enterprise radiuse server details. 479 ap_count: APs to configure. 480 ieee80211w:PMF to configure 481 """ 482 if mirror_ap and ap_count == 1: 483 raise ValueError("ap_count cannot be 1 if mirror_ap is True.") 484 if (channel_5g_ap2 or channel_2g_ap2) and ap_count == 1: 485 raise ValueError( 486 "ap_count cannot be 1 if channels of AP2 are provided.") 487 # we are creating a channel list for 2G and 5G bands. The list is of 488 # size 2 and this is based on the assumption that each testbed will have 489 # at most 2 APs. 490 if not channel_5g_ap2: 491 channel_5g_ap2 = channel_5g 492 if not channel_2g_ap2: 493 channel_2g_ap2 = channel_2g 494 channels_2g = [channel_2g, channel_2g_ap2] 495 channels_5g = [channel_5g, channel_5g_ap2] 496 497 self.reference_networks = [] 498 self.wpa1_networks = [] 499 self.wpa_networks = [] 500 self.wep_networks = [] 501 self.ent_networks = [] 502 self.ent_networks_pwd = [] 503 self.open_network = [] 504 self.owe_networks = [] 505 self.sae_networks = [] 506 self.saemixed_networks = [] 507 self.bssid_map = [] 508 for i in range(ap_count): 509 network_list = [] 510 if wpa1_network: 511 wpa1_dict = self.get_psk_network(mirror_ap, self.wpa1_networks, 512 hidden, same_ssid, 513 ssid_length_2g, 514 ssid_length_5g, 515 passphrase_length_2g, 516 passphrase_length_5g) 517 wpa1_dict[hostapd_constants.BAND_2G]["security"] = "psk" 518 wpa1_dict[hostapd_constants.BAND_5G]["security"] = "psk" 519 wpa1_dict[hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w 520 wpa1_dict[hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w 521 self.wpa1_networks.append(wpa1_dict) 522 network_list.append(wpa1_dict) 523 if wpa_network: 524 wpa_dict = self.get_psk_network(mirror_ap, 525 self.reference_networks, 526 hidden, same_ssid, 527 ssid_length_2g, ssid_length_5g, 528 passphrase_length_2g, 529 passphrase_length_5g) 530 wpa_dict[hostapd_constants.BAND_2G]["security"] = "psk2" 531 wpa_dict[hostapd_constants.BAND_5G]["security"] = "psk2" 532 wpa_dict[hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w 533 wpa_dict[hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w 534 self.wpa_networks.append(wpa_dict) 535 network_list.append(wpa_dict) 536 if wep_network: 537 wep_dict = self.get_wep_network(mirror_ap, self.wep_networks, 538 hidden, same_ssid, 539 ssid_length_2g, ssid_length_5g) 540 network_list.append(wep_dict) 541 if ent_network: 542 ent_dict = self.get_open_network(mirror_ap, self.ent_networks, 543 hidden, same_ssid, 544 ssid_length_2g, 545 ssid_length_5g) 546 ent_dict["2g"]["security"] = "wpa2" 547 ent_dict["2g"].update(radius_conf_2g) 548 ent_dict["5g"]["security"] = "wpa2" 549 ent_dict["5g"].update(radius_conf_5g) 550 network_list.append(ent_dict) 551 if ent_network_pwd: 552 ent_pwd_dict = self.get_open_network(mirror_ap, 553 self.ent_networks_pwd, 554 hidden, same_ssid, 555 ssid_length_2g, 556 ssid_length_5g) 557 ent_pwd_dict["2g"]["security"] = "wpa2" 558 ent_pwd_dict["2g"].update(radius_conf_pwd) 559 ent_pwd_dict["5g"]["security"] = "wpa2" 560 ent_pwd_dict["5g"].update(radius_conf_pwd) 561 network_list.append(ent_pwd_dict) 562 if open_network: 563 open_dict = self.get_open_network(mirror_ap, self.open_network, 564 hidden, same_ssid, 565 ssid_length_2g, 566 ssid_length_5g) 567 network_list.append(open_dict) 568 if owe_network: 569 owe_dict = self.get_open_network(mirror_ap, self.owe_networks, 570 hidden, same_ssid, 571 ssid_length_2g, 572 ssid_length_5g, "OWE") 573 owe_dict[hostapd_constants.BAND_2G]["security"] = "owe" 574 owe_dict[hostapd_constants.BAND_5G]["security"] = "owe" 575 network_list.append(owe_dict) 576 if sae_network: 577 sae_dict = self.get_psk_network(mirror_ap, self.sae_networks, 578 hidden, same_ssid, 579 hostapd_constants.SAE_KEY_MGMT, 580 ssid_length_2g, ssid_length_5g, 581 passphrase_length_2g, 582 passphrase_length_5g) 583 sae_dict[hostapd_constants.BAND_2G]["security"] = "sae" 584 sae_dict[hostapd_constants.BAND_5G]["security"] = "sae" 585 network_list.append(sae_dict) 586 if saemixed_network: 587 saemixed_dict = self.get_psk_network( 588 mirror_ap, self.saemixed_networks, hidden, same_ssid, 589 hostapd_constants.SAE_KEY_MGMT, ssid_length_2g, 590 ssid_length_5g, passphrase_length_2g, passphrase_length_5g) 591 saemixed_dict[ 592 hostapd_constants.BAND_2G]["security"] = "sae-mixed" 593 saemixed_dict[ 594 hostapd_constants.BAND_5G]["security"] = "sae-mixed" 595 saemixed_dict[ 596 hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w 597 saemixed_dict[ 598 hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w 599 network_list.append(saemixed_dict) 600 self.access_points[i].configure_ap(network_list, channels_2g[i], 601 channels_5g[i]) 602 self.access_points[i].start_ap() 603 self.bssid_map.append( 604 self.access_points[i].get_bssids_for_wifi_networks()) 605 if mirror_ap: 606 self.access_points[i + 1].configure_ap(network_list, 607 channels_2g[i + 1], 608 channels_5g[i + 1]) 609 self.access_points[i + 1].start_ap() 610 self.bssid_map.append( 611 self.access_points[i + 1].get_bssids_for_wifi_networks()) 612 break 613 614 def legacy_configure_ap_and_start( 615 self, 616 channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G, 617 channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G, 618 max_2g_networks=hostapd_constants.AP_DEFAULT_MAX_SSIDS_2G, 619 max_5g_networks=hostapd_constants.AP_DEFAULT_MAX_SSIDS_5G, 620 ap_ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 621 ap_passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, 622 ap_ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 623 ap_passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G, 624 hidden=False, 625 same_ssid=False, 626 mirror_ap=True, 627 wpa_network=False, 628 wep_network=False, 629 ent_network=False, 630 radius_conf_2g=None, 631 radius_conf_5g=None, 632 ent_network_pwd=False, 633 radius_conf_pwd=None, 634 ap_count=1): 635 636 config_count = 1 637 count = 0 638 639 # For example, the NetworkSelector tests use 2 APs and require that 640 # both APs are not mirrored. 641 if not mirror_ap and ap_count == 1: 642 raise ValueError("ap_count cannot be 1 if mirror_ap is False.") 643 644 if not mirror_ap: 645 config_count = ap_count 646 647 self.user_params["reference_networks"] = [] 648 self.user_params["open_network"] = [] 649 if wpa_network: 650 self.user_params["wpa_networks"] = [] 651 if wep_network: 652 self.user_params["wep_networks"] = [] 653 if ent_network: 654 self.user_params["ent_networks"] = [] 655 if ent_network_pwd: 656 self.user_params["ent_networks_pwd"] = [] 657 658 # kill hostapd & dhcpd if the cleanup was not successful 659 for i in range(len(self.access_points)): 660 self.log.debug("Check ap state and cleanup") 661 self._cleanup_hostapd_and_dhcpd(i) 662 663 for count in range(config_count): 664 665 network_list_2g = [] 666 network_list_5g = [] 667 668 orig_network_list_2g = [] 669 orig_network_list_5g = [] 670 671 network_list_2g.append({"channel": channel_2g}) 672 network_list_5g.append({"channel": channel_5g}) 673 674 networks_dict = self.get_psk_network( 675 mirror_ap, 676 self.user_params["reference_networks"], 677 hidden=hidden, 678 same_ssid=same_ssid) 679 self.reference_networks = self.user_params["reference_networks"] 680 681 network_list_2g.append(networks_dict["2g"]) 682 network_list_5g.append(networks_dict["5g"]) 683 684 # When same_ssid is set, only configure one set of WPA networks. 685 # We cannot have more than one set because duplicate interface names 686 # are not allowed. 687 # TODO(bmahadev): Provide option to select the type of network, 688 # instead of defaulting to WPA. 689 if not same_ssid: 690 networks_dict = self.get_open_network( 691 mirror_ap, 692 self.user_params["open_network"], 693 hidden=hidden, 694 same_ssid=same_ssid) 695 self.open_network = self.user_params["open_network"] 696 697 network_list_2g.append(networks_dict["2g"]) 698 network_list_5g.append(networks_dict["5g"]) 699 700 if wpa_network: 701 networks_dict = self.get_psk_network( 702 mirror_ap, 703 self.user_params["wpa_networks"], 704 hidden=hidden, 705 same_ssid=same_ssid, 706 security_mode=hostapd_constants.WPA_STRING) 707 self.wpa_networks = self.user_params["wpa_networks"] 708 709 network_list_2g.append(networks_dict["2g"]) 710 network_list_5g.append(networks_dict["5g"]) 711 712 if wep_network: 713 networks_dict = self.get_wep_network( 714 mirror_ap, 715 self.user_params["wep_networks"], 716 hidden=hidden, 717 same_ssid=same_ssid) 718 self.wep_networks = self.user_params["wep_networks"] 719 720 network_list_2g.append(networks_dict["2g"]) 721 network_list_5g.append(networks_dict["5g"]) 722 723 if ent_network: 724 networks_dict = self.get_open_network( 725 mirror_ap, 726 self.user_params["ent_networks"], 727 hidden=hidden, 728 same_ssid=same_ssid) 729 networks_dict["2g"][ 730 "security"] = hostapd_constants.ENT_STRING 731 networks_dict["2g"].update(radius_conf_2g) 732 networks_dict["5g"][ 733 "security"] = hostapd_constants.ENT_STRING 734 networks_dict["5g"].update(radius_conf_5g) 735 self.ent_networks = self.user_params["ent_networks"] 736 737 network_list_2g.append(networks_dict["2g"]) 738 network_list_5g.append(networks_dict["5g"]) 739 740 if ent_network_pwd: 741 networks_dict = self.get_open_network( 742 mirror_ap, 743 self.user_params["ent_networks_pwd"], 744 hidden=hidden, 745 same_ssid=same_ssid) 746 networks_dict["2g"][ 747 "security"] = hostapd_constants.ENT_STRING 748 networks_dict["2g"].update(radius_conf_pwd) 749 networks_dict["5g"][ 750 "security"] = hostapd_constants.ENT_STRING 751 networks_dict["5g"].update(radius_conf_pwd) 752 self.ent_networks_pwd = self.user_params[ 753 "ent_networks_pwd"] 754 755 network_list_2g.append(networks_dict["2g"]) 756 network_list_5g.append(networks_dict["5g"]) 757 758 orig_network_list_5g = copy.copy(network_list_5g) 759 orig_network_list_2g = copy.copy(network_list_2g) 760 761 if len(network_list_5g) > 1: 762 self.config_5g = self._generate_legacy_ap_config( 763 network_list_5g) 764 if len(network_list_2g) > 1: 765 self.config_2g = self._generate_legacy_ap_config( 766 network_list_2g) 767 768 self.access_points[count].start_ap(self.config_2g) 769 self.access_points[count].start_ap(self.config_5g) 770 self.populate_bssid(count, self.access_points[count], 771 orig_network_list_5g, orig_network_list_2g) 772 773 # Repeat configuration on the second router. 774 if mirror_ap and ap_count == 2: 775 self.access_points[AP_2].start_ap(self.config_2g) 776 self.access_points[AP_2].start_ap(self.config_5g) 777 self.populate_bssid(AP_2, self.access_points[AP_2], 778 orig_network_list_5g, orig_network_list_2g) 779 780 def _kill_processes(self, ap, daemon): 781 """ Kill hostapd and dhcpd daemons 782 783 Args: 784 ap: AP to cleanup 785 daemon: process to kill 786 787 Returns: True/False if killing process is successful 788 """ 789 self.log.info("Killing %s" % daemon) 790 pids = ap.ssh.run('pidof %s' % daemon, ignore_status=True) 791 if pids.stdout: 792 ap.ssh.run('kill %s' % pids.stdout, ignore_status=True) 793 time.sleep(3) 794 pids = ap.ssh.run('pidof %s' % daemon, ignore_status=True) 795 if pids.stdout: 796 return False 797 return True 798 799 def _cleanup_hostapd_and_dhcpd(self, count): 800 """ Check if AP was cleaned up properly 801 802 Kill hostapd and dhcpd processes if cleanup was not successful in the 803 last run 804 805 Args: 806 count: AP to check 807 808 Returns: 809 New AccessPoint object if AP required cleanup 810 811 Raises: 812 Error: if the AccessPoint timed out to setup 813 """ 814 ap = self.access_points[count] 815 phy_ifaces = ap.interfaces.get_physical_interface() 816 kill_hostapd = False 817 for iface in phy_ifaces: 818 if '2g_' in iface or '5g_' in iface or 'xg_' in iface: 819 kill_hostapd = True 820 break 821 822 if not kill_hostapd: 823 return 824 825 self.log.debug("Cleanup AP") 826 if not self._kill_processes(ap, 'hostapd') or \ 827 not self._kill_processes(ap, 'dhcpd'): 828 raise ("Failed to cleanup AP") 829 830 ap.__init__(self.user_params['AccessPoint'][count]) 831 832 def _generate_legacy_ap_config(self, network_list): 833 bss_settings = [] 834 wlan_2g = self.access_points[AP_1].wlan_2g 835 wlan_5g = self.access_points[AP_1].wlan_5g 836 ap_settings = network_list.pop(0) 837 # TODO:(bmahadev) This is a bug. We should not have to pop the first 838 # network in the list and treat it as a separate case. Instead, 839 # create_ap_preset() should be able to take NULL ssid and security and 840 # build config based on the bss_Settings alone. 841 hostapd_config_settings = network_list.pop(0) 842 for network in network_list: 843 if "password" in network: 844 bss_settings.append( 845 hostapd_bss_settings.BssSettings( 846 name=network["SSID"], 847 ssid=network["SSID"], 848 hidden=network["hiddenSSID"], 849 security=hostapd_security.Security( 850 security_mode=network["security"], 851 password=network["password"]))) 852 elif "wepKeys" in network: 853 bss_settings.append( 854 hostapd_bss_settings.BssSettings( 855 name=network["SSID"], 856 ssid=network["SSID"], 857 hidden=network["hiddenSSID"], 858 security=hostapd_security.Security( 859 security_mode=network["security"], 860 password=network["wepKeys"][0]))) 861 elif network["security"] == hostapd_constants.ENT_STRING: 862 bss_settings.append( 863 hostapd_bss_settings.BssSettings( 864 name=network["SSID"], 865 ssid=network["SSID"], 866 hidden=network["hiddenSSID"], 867 security=hostapd_security.Security( 868 security_mode=network["security"], 869 radius_server_ip=network["radius_server_ip"], 870 radius_server_port=network["radius_server_port"], 871 radius_server_secret=network[ 872 "radius_server_secret"]))) 873 else: 874 bss_settings.append( 875 hostapd_bss_settings.BssSettings( 876 name=network["SSID"], 877 ssid=network["SSID"], 878 hidden=network["hiddenSSID"])) 879 if "password" in hostapd_config_settings: 880 config = hostapd_ap_preset.create_ap_preset( 881 iface_wlan_2g=wlan_2g, 882 iface_wlan_5g=wlan_5g, 883 channel=ap_settings["channel"], 884 ssid=hostapd_config_settings["SSID"], 885 hidden=hostapd_config_settings["hiddenSSID"], 886 security=hostapd_security.Security( 887 security_mode=hostapd_config_settings["security"], 888 password=hostapd_config_settings["password"]), 889 bss_settings=bss_settings) 890 elif "wepKeys" in hostapd_config_settings: 891 config = hostapd_ap_preset.create_ap_preset( 892 iface_wlan_2g=wlan_2g, 893 iface_wlan_5g=wlan_5g, 894 channel=ap_settings["channel"], 895 ssid=hostapd_config_settings["SSID"], 896 hidden=hostapd_config_settings["hiddenSSID"], 897 security=hostapd_security.Security( 898 security_mode=hostapd_config_settings["security"], 899 password=hostapd_config_settings["wepKeys"][0]), 900 bss_settings=bss_settings) 901 else: 902 config = hostapd_ap_preset.create_ap_preset( 903 iface_wlan_2g=wlan_2g, 904 iface_wlan_5g=wlan_5g, 905 channel=ap_settings["channel"], 906 ssid=hostapd_config_settings["SSID"], 907 hidden=hostapd_config_settings["hiddenSSID"], 908 bss_settings=bss_settings) 909 return config 910 911 def configure_packet_capture( 912 self, 913 channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G, 914 channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G): 915 """Configure packet capture for 2G and 5G bands. 916 917 Args: 918 channel_5g: Channel to set the monitor mode to for 5G band. 919 channel_2g: Channel to set the monitor mode to for 2G band. 920 """ 921 self.packet_capture = self.packet_capture[0] 922 result = self.packet_capture.configure_monitor_mode( 923 hostapd_constants.BAND_2G, channel_2g) 924 if not result: 925 raise ValueError("Failed to configure channel for 2G band") 926 927 result = self.packet_capture.configure_monitor_mode( 928 hostapd_constants.BAND_5G, channel_5g) 929 if not result: 930 raise ValueError("Failed to configure channel for 5G band.") 931 932 @staticmethod 933 def wifi_test_wrap(fn): 934 def _safe_wrap_test_case(self, *args, **kwargs): 935 test_id = "%s:%s:%s" % (self.__class__.__name__, self.test_name, 936 self.log_begin_time.replace(' ', '-')) 937 self.test_id = test_id 938 self.result_detail = "" 939 tries = int(self.user_params.get("wifi_auto_rerun", 3)) 940 for ad in self.android_devices: 941 ad.log_path = self.log_path 942 for i in range(tries + 1): 943 result = True 944 if i > 0: 945 log_string = "[Test Case] RETRY:%s %s" % (i, 946 self.test_name) 947 self.log.info(log_string) 948 self._teardown_test(self.test_name) 949 self._setup_test(self.test_name) 950 try: 951 result = fn(self, *args, **kwargs) 952 except signals.TestFailure as e: 953 self.log.warn("Error msg: %s" % e) 954 if self.result_detail: 955 signal.details = self.result_detail 956 result = False 957 except signals.TestSignal: 958 if self.result_detail: 959 signal.details = self.result_detail 960 raise 961 except Exception as e: 962 self.log.exception(e) 963 asserts.fail(self.result_detail) 964 if result is False: 965 if i < tries: 966 continue 967 else: 968 break 969 if result is not False: 970 asserts.explicit_pass(self.result_detail) 971 else: 972 asserts.fail(self.result_detail) 973 974 return _safe_wrap_test_case 975