1#!/usr/bin/env python3 2# 3# Copyright 2017 - Google 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16""" 17 Base Class for Defining Common WiFi Test Functionality 18""" 19 20import copy 21import os 22import time 23 24from acts import asserts 25from acts import context 26from acts import signals 27from acts import utils 28from acts.base_test import BaseTestClass 29from acts.controllers.ap_lib import hostapd_ap_preset 30from acts.controllers.ap_lib import hostapd_bss_settings 31from acts.controllers.ap_lib import hostapd_constants 32from acts.controllers.ap_lib import hostapd_security 33from acts.keys import Config 34from acts_contrib.test_utils.net import net_test_utils as nutils 35from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 36 37from mobly.base_test import STAGE_NAME_TEARDOWN_CLASS 38 39WifiEnums = wutils.WifiEnums 40AP_1 = 0 41AP_2 = 1 42MAX_AP_COUNT = 2 43 44 45class WifiBaseTest(BaseTestClass): 46 def __init__(self, configs): 47 super().__init__(configs) 48 self.enable_packet_log = False 49 self.packet_log_2g = hostapd_constants.AP_DEFAULT_CHANNEL_2G 50 self.packet_log_5g = hostapd_constants.AP_DEFAULT_CHANNEL_5G 51 self.tcpdump_proc = [] 52 self.packet_log_pid = {} 53 54 def setup_class(self): 55 if hasattr(self, 'attenuators') and self.attenuators: 56 for attenuator in self.attenuators: 57 attenuator.set_atten(0) 58 opt_param = ["country_code_file"] 59 self.unpack_userparams(opt_param_names=opt_param) 60 if self.enable_packet_log and hasattr(self, "packet_capture"): 61 self.packet_logger = self.packet_capture[0] 62 self.packet_logger.configure_monitor_mode("2G", self.packet_log_2g) 63 self.packet_logger.configure_monitor_mode("5G", self.packet_log_5g) 64 if hasattr(self, "android_devices"): 65 for ad in self.android_devices: 66 wutils.wifi_test_device_init(ad) 67 if hasattr(self, "country_code_file"): 68 if isinstance(self.country_code_file, list): 69 self.country_code_file = self.country_code_file[0] 70 if not os.path.isfile(self.country_code_file): 71 self.country_code_file = os.path.join( 72 self.user_params[Config.key_config_path.value], 73 self.country_code_file) 74 self.country_code = utils.load_config( 75 self.country_code_file)["country"] 76 else: 77 self.country_code = WifiEnums.CountryCode.US 78 wutils.set_wifi_country_code(ad, self.country_code) 79 80 if hasattr(self, "flagged_features"): 81 self._configure_flagged_features(ad, self.flagged_features) 82 83 def setup_test(self): 84 if (hasattr(self, "android_devices")): 85 wutils.start_all_wlan_logs(self.android_devices) 86 self.tcpdump_proc = [] 87 if hasattr(self, "android_devices"): 88 for ad in self.android_devices: 89 proc = nutils.start_tcpdump(ad, self.test_name) 90 self.tcpdump_proc.append((ad, proc)) 91 if hasattr(self, "packet_logger"): 92 self.packet_log_pid = wutils.start_pcap(self.packet_logger, 'dual', 93 self.test_name) 94 95 def teardown_test(self): 96 if (hasattr(self, "android_devices")): 97 wutils.stop_all_wlan_logs(self.android_devices) 98 for proc in self.tcpdump_proc: 99 nutils.stop_tcpdump(proc[0], 100 proc[1], 101 self.test_name, 102 pull_dump=False) 103 self.tcpdump_proc = [] 104 if hasattr(self, "packet_logger") and self.packet_log_pid: 105 wutils.stop_pcap(self.packet_logger, 106 self.packet_log_pid, 107 test_status=True) 108 self.packet_log_pid = {} 109 110 def teardown_class(self): 111 begin_time = utils.get_current_epoch_time() 112 super().teardown_class() 113 for device in getattr(self, "fuchsia_devices", []): 114 device.take_bug_report(STAGE_NAME_TEARDOWN_CLASS, begin_time) 115 116 def on_fail(self, test_name, begin_time): 117 if hasattr(self, "android_devices"): 118 for ad in self.android_devices: 119 ad.take_bug_report(test_name, begin_time) 120 ad.cat_adb_log(test_name, begin_time) 121 wutils.get_ssrdumps(ad) 122 wutils.stop_all_wlan_logs(self.android_devices) 123 for ad in self.android_devices: 124 wutils.get_wlan_logs(ad) 125 for proc in self.tcpdump_proc: 126 nutils.stop_tcpdump(proc[0], proc[1], self.test_name) 127 self.tcpdump_proc = [] 128 if hasattr(self, "packet_logger") and self.packet_log_pid: 129 wutils.stop_pcap(self.packet_logger, 130 self.packet_log_pid, 131 test_status=False) 132 self.packet_log_pid = {} 133 134 # Gets a wlan_device log and calls the generic device fail on DUT. 135 for device in getattr(self, "fuchsia_devices", []): 136 self.on_device_fail(device, test_name, begin_time) 137 138 def on_device_fail(self, device, test_name, begin_time): 139 """Gets a generic device DUT bug report. 140 141 This method takes a bug report if the device has the 142 'take_bug_report_on_fail' config value, and if the flag is true. This 143 method also power cycles if 'hard_reboot_on_fail' is True. 144 145 Args: 146 device: Generic device to gather logs from. 147 test_name: Name of the test that triggered this function. 148 begin_time: Logline format timestamp taken when the test started. 149 """ 150 if (not hasattr(device, "take_bug_report_on_fail") 151 or device.take_bug_report_on_fail): 152 device.take_bug_report(test_name, begin_time) 153 154 if hasattr(device, 155 "hard_reboot_on_fail") and device.hard_reboot_on_fail: 156 device.reboot(reboot_type='hard', testbed_pdus=self.pdu_devices) 157 158 def download_ap_logs(self): 159 """Downloads the DHCP and hostapad logs from the access_point. 160 161 Using the current TestClassContext and TestCaseContext this method pulls 162 the DHCP and hostapd logs and outputs them to the correct path. 163 """ 164 current_path = context.get_current_context().get_full_output_path() 165 dhcp_full_out_path = os.path.join(current_path, "dhcp_log.txt") 166 167 dhcp_log = self.access_point.get_dhcp_logs() 168 if dhcp_log: 169 dhcp_log_file = open(dhcp_full_out_path, 'w') 170 dhcp_log_file.write(dhcp_log) 171 dhcp_log_file.close() 172 173 hostapd_logs = self.access_point.get_hostapd_logs() 174 for interface in hostapd_logs: 175 out_name = interface + "_hostapd_log.txt" 176 hostapd_full_out_path = os.path.join(current_path, out_name) 177 hostapd_log_file = open(hostapd_full_out_path, 'w') 178 hostapd_log_file.write(hostapd_logs[interface]) 179 hostapd_log_file.close() 180 181 def get_psk_network( 182 self, 183 mirror_ap, 184 reference_networks, 185 hidden=False, 186 same_ssid=False, 187 security_mode=hostapd_constants.WPA2_STRING, 188 ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 189 ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 190 passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, 191 passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G): 192 """Generates SSID and passphrase for a WPA2 network using random 193 generator. 194 195 Args: 196 mirror_ap: Boolean, determines if both APs use the same hostapd 197 config or different configs. 198 reference_networks: List of PSK networks. 199 same_ssid: Boolean, determines if both bands on AP use the same 200 SSID. 201 ssid_length_2gecond AP Int, number of characters to use for 2G SSID. 202 ssid_length_5g: Int, number of characters to use for 5G SSID. 203 passphrase_length_2g: Int, length of password for 2G network. 204 passphrase_length_5g: Int, length of password for 5G network. 205 206 Returns: A dict of 2G and 5G network lists for hostapd configuration. 207 208 """ 209 network_dict_2g = {} 210 network_dict_5g = {} 211 ref_5g_security = security_mode 212 ref_2g_security = security_mode 213 214 if same_ssid: 215 ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) 216 ref_5g_ssid = ref_2g_ssid 217 218 ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g) 219 ref_5g_passphrase = ref_2g_passphrase 220 221 else: 222 ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) 223 ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g) 224 225 ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) 226 ref_5g_passphrase = utils.rand_ascii_str(passphrase_length_5g) 227 228 network_dict_2g = { 229 "SSID": ref_2g_ssid, 230 "security": ref_2g_security, 231 "password": ref_2g_passphrase, 232 "hiddenSSID": hidden 233 } 234 235 network_dict_5g = { 236 "SSID": ref_5g_ssid, 237 "security": ref_5g_security, 238 "password": ref_5g_passphrase, 239 "hiddenSSID": hidden 240 } 241 242 ap = 0 243 for ap in range(MAX_AP_COUNT): 244 reference_networks.append({ 245 "2g": copy.copy(network_dict_2g), 246 "5g": copy.copy(network_dict_5g) 247 }) 248 if not mirror_ap: 249 break 250 return {"2g": network_dict_2g, "5g": network_dict_5g} 251 252 def get_open_network(self, 253 mirror_ap, 254 open_network, 255 hidden=False, 256 same_ssid=False, 257 ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 258 ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 259 security_mode='none'): 260 """Generates SSIDs for a open network using a random generator. 261 262 Args: 263 mirror_ap: Boolean, determines if both APs use the same hostapd 264 config or different configs. 265 open_network: List of open networks. 266 same_ssid: Boolean, determines if both bands on AP use the same 267 SSID. 268 ssid_length_2g: Int, number of characters to use for 2G SSID. 269 ssid_length_5g: Int, number of characters to use for 5G SSID. 270 security_mode: 'none' for open and 'OWE' for WPA3 OWE. 271 272 Returns: A dict of 2G and 5G network lists for hostapd configuration. 273 274 """ 275 network_dict_2g = {} 276 network_dict_5g = {} 277 278 if same_ssid: 279 open_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) 280 open_5g_ssid = open_2g_ssid 281 282 else: 283 open_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) 284 open_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) 285 286 network_dict_2g = { 287 "SSID": open_2g_ssid, 288 "security": security_mode, 289 "hiddenSSID": hidden 290 } 291 292 network_dict_5g = { 293 "SSID": open_5g_ssid, 294 "security": security_mode, 295 "hiddenSSID": hidden 296 } 297 298 ap = 0 299 for ap in range(MAX_AP_COUNT): 300 open_network.append({ 301 "2g": copy.copy(network_dict_2g), 302 "5g": copy.copy(network_dict_5g) 303 }) 304 if not mirror_ap: 305 break 306 return {"2g": network_dict_2g, "5g": network_dict_5g} 307 308 def get_wep_network( 309 self, 310 mirror_ap, 311 networks, 312 hidden=False, 313 same_ssid=False, 314 ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 315 ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 316 passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, 317 passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G): 318 """Generates SSID and passphrase for a WEP network using random 319 generator. 320 321 Args: 322 mirror_ap: Boolean, determines if both APs use the same hostapd 323 config or different configs. 324 networks: List of WEP networks. 325 same_ssid: Boolean, determines if both bands on AP use the same 326 SSID. 327 ssid_length_2gecond AP Int, number of characters to use for 2G SSID. 328 ssid_length_5g: Int, number of characters to use for 5G SSID. 329 passphrase_length_2g: Int, length of password for 2G network. 330 passphrase_length_5g: Int, length of password for 5G network. 331 332 Returns: A dict of 2G and 5G network lists for hostapd configuration. 333 334 """ 335 network_dict_2g = {} 336 network_dict_5g = {} 337 ref_5g_security = hostapd_constants.WEP_STRING 338 ref_2g_security = hostapd_constants.WEP_STRING 339 340 if same_ssid: 341 ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) 342 ref_5g_ssid = ref_2g_ssid 343 344 ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g) 345 ref_5g_passphrase = ref_2g_passphrase 346 347 else: 348 ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) 349 ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g) 350 351 ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) 352 ref_5g_passphrase = utils.rand_hex_str(passphrase_length_5g) 353 354 network_dict_2g = { 355 "SSID": ref_2g_ssid, 356 "security": ref_2g_security, 357 "wepKeys": [ref_2g_passphrase] * 4, 358 "hiddenSSID": hidden 359 } 360 361 network_dict_5g = { 362 "SSID": ref_5g_ssid, 363 "security": ref_5g_security, 364 "wepKeys": [ref_2g_passphrase] * 4, 365 "hiddenSSID": hidden 366 } 367 368 ap = 0 369 for ap in range(MAX_AP_COUNT): 370 networks.append({ 371 "2g": copy.copy(network_dict_2g), 372 "5g": copy.copy(network_dict_5g) 373 }) 374 if not mirror_ap: 375 break 376 return {"2g": network_dict_2g, "5g": network_dict_5g} 377 378 def update_bssid(self, ap_instance, ap, network, band): 379 """Get bssid and update network dictionary. 380 381 Args: 382 ap_instance: Accesspoint index that was configured. 383 ap: Accesspoint object corresponding to ap_instance. 384 network: Network dictionary. 385 band: Wifi networks' band. 386 387 """ 388 bssid = ap.get_bssid_from_ssid(network["SSID"], band) 389 390 if network["security"] == hostapd_constants.WPA2_STRING: 391 # TODO:(bamahadev) Change all occurances of reference_networks 392 # to wpa_networks. 393 self.reference_networks[ap_instance][band]["bssid"] = bssid 394 if network["security"] == hostapd_constants.WPA_STRING: 395 self.wpa_networks[ap_instance][band]["bssid"] = bssid 396 if network["security"] == hostapd_constants.WEP_STRING: 397 self.wep_networks[ap_instance][band]["bssid"] = bssid 398 if network["security"] == hostapd_constants.ENT_STRING: 399 if "bssid" not in self.ent_networks[ap_instance][band]: 400 self.ent_networks[ap_instance][band]["bssid"] = bssid 401 else: 402 self.ent_networks_pwd[ap_instance][band]["bssid"] = bssid 403 if network["security"] == 'none': 404 self.open_network[ap_instance][band]["bssid"] = bssid 405 406 def populate_bssid(self, ap_instance, ap, networks_5g, networks_2g): 407 """Get bssid for a given SSID and add it to the network dictionary. 408 409 Args: 410 ap_instance: Accesspoint index that was configured. 411 ap: Accesspoint object corresponding to ap_instance. 412 networks_5g: List of 5g networks configured on the APs. 413 networks_2g: List of 2g networks configured on the APs. 414 415 """ 416 417 if not (networks_5g or networks_2g): 418 return 419 420 for network in networks_5g: 421 if 'channel' in network: 422 continue 423 self.update_bssid(ap_instance, ap, network, 424 hostapd_constants.BAND_5G) 425 426 for network in networks_2g: 427 if 'channel' in network: 428 continue 429 self.update_bssid(ap_instance, ap, network, 430 hostapd_constants.BAND_2G) 431 432 def configure_openwrt_ap_and_start( 433 self, 434 channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G, 435 channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G, 436 channel_5g_ap2=None, 437 channel_2g_ap2=None, 438 ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 439 passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, 440 ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 441 passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G, 442 mirror_ap=False, 443 hidden=False, 444 same_ssid=False, 445 open_network=False, 446 wpa1_network=False, 447 wpa_network=False, 448 wep_network=False, 449 ent_network=False, 450 ent_network_pwd=False, 451 owe_network=False, 452 sae_network=False, 453 saemixed_network=False, 454 radius_conf_2g=None, 455 radius_conf_5g=None, 456 radius_conf_pwd=None, 457 ap_count=1, 458 ieee80211w=None): 459 """Create, configure and start OpenWrt AP. 460 461 Args: 462 channel_5g: 5G channel to configure. 463 channel_2g: 2G channel to configure. 464 channel_5g_ap2: 5G channel to configure on AP2. 465 channel_2g_ap2: 2G channel to configure on AP2. 466 ssid_length_2g: Int, number of characters to use for 2G SSID. 467 passphrase_length_2g: Int, length of password for 2G network. 468 ssid_length_5g: Int, number of characters to use for 5G SSID. 469 passphrase_length_5g: Int, length of password for 5G network. 470 same_ssid: Boolean, determines if both bands on AP use the same SSID. 471 open_network: Boolean, to check if open network should be configured. 472 wpa_network: Boolean, to check if wpa network should be configured. 473 wep_network: Boolean, to check if wep network should be configured. 474 ent_network: Boolean, to check if ent network should be configured. 475 ent_network_pwd: Boolean, to check if ent pwd network should be configured. 476 owe_network: Boolean, to check if owe network should be configured. 477 sae_network: Boolean, to check if sae network should be configured. 478 saemixed_network: Boolean, to check if saemixed network should be configured. 479 radius_conf_2g: dictionary with enterprise radius server details. 480 radius_conf_5g: dictionary with enterprise radius server details. 481 radius_conf_pwd: dictionary with enterprise radiuse server details. 482 ap_count: APs to configure. 483 ieee80211w:PMF to configure 484 """ 485 if mirror_ap and ap_count == 1: 486 raise ValueError("ap_count cannot be 1 if mirror_ap is True.") 487 if (channel_5g_ap2 or channel_2g_ap2) and ap_count == 1: 488 raise ValueError( 489 "ap_count cannot be 1 if channels of AP2 are provided.") 490 # we are creating a channel list for 2G and 5G bands. The list is of 491 # size 2 and this is based on the assumption that each testbed will have 492 # at most 2 APs. 493 if not channel_5g_ap2: 494 channel_5g_ap2 = channel_5g 495 if not channel_2g_ap2: 496 channel_2g_ap2 = channel_2g 497 channels_2g = [channel_2g, channel_2g_ap2] 498 channels_5g = [channel_5g, channel_5g_ap2] 499 500 self.reference_networks = [] 501 self.wpa1_networks = [] 502 self.wpa_networks = [] 503 self.wep_networks = [] 504 self.ent_networks = [] 505 self.ent_networks_pwd = [] 506 self.open_network = [] 507 self.owe_networks = [] 508 self.sae_networks = [] 509 self.saemixed_networks = [] 510 self.bssid_map = [] 511 for i in range(ap_count): 512 network_list = [] 513 if wpa1_network: 514 wpa1_dict = self.get_psk_network(mirror_ap, self.wpa1_networks, 515 hidden, same_ssid, 516 ssid_length_2g, 517 ssid_length_5g, 518 passphrase_length_2g, 519 passphrase_length_5g) 520 wpa1_dict[hostapd_constants.BAND_2G]["security"] = "psk" 521 wpa1_dict[hostapd_constants.BAND_5G]["security"] = "psk" 522 wpa1_dict[hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w 523 wpa1_dict[hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w 524 self.wpa1_networks.append(wpa1_dict) 525 network_list.append(wpa1_dict) 526 if wpa_network: 527 wpa_dict = self.get_psk_network(mirror_ap, 528 self.reference_networks, 529 hidden, same_ssid, 530 ssid_length_2g, ssid_length_5g, 531 passphrase_length_2g, 532 passphrase_length_5g) 533 wpa_dict[hostapd_constants.BAND_2G]["security"] = "psk2" 534 wpa_dict[hostapd_constants.BAND_5G]["security"] = "psk2" 535 wpa_dict[hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w 536 wpa_dict[hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w 537 self.wpa_networks.append(wpa_dict) 538 network_list.append(wpa_dict) 539 if wep_network: 540 wep_dict = self.get_wep_network(mirror_ap, self.wep_networks, 541 hidden, same_ssid, 542 ssid_length_2g, ssid_length_5g) 543 network_list.append(wep_dict) 544 if ent_network: 545 ent_dict = self.get_open_network(mirror_ap, self.ent_networks, 546 hidden, same_ssid, 547 ssid_length_2g, 548 ssid_length_5g) 549 ent_dict["2g"]["security"] = "wpa2" 550 ent_dict["2g"].update(radius_conf_2g) 551 ent_dict["5g"]["security"] = "wpa2" 552 ent_dict["5g"].update(radius_conf_5g) 553 network_list.append(ent_dict) 554 if ent_network_pwd: 555 ent_pwd_dict = self.get_open_network(mirror_ap, 556 self.ent_networks_pwd, 557 hidden, same_ssid, 558 ssid_length_2g, 559 ssid_length_5g) 560 ent_pwd_dict["2g"]["security"] = "wpa2" 561 ent_pwd_dict["2g"].update(radius_conf_pwd) 562 ent_pwd_dict["5g"]["security"] = "wpa2" 563 ent_pwd_dict["5g"].update(radius_conf_pwd) 564 network_list.append(ent_pwd_dict) 565 if open_network: 566 open_dict = self.get_open_network(mirror_ap, self.open_network, 567 hidden, same_ssid, 568 ssid_length_2g, 569 ssid_length_5g) 570 network_list.append(open_dict) 571 if owe_network: 572 owe_dict = self.get_open_network(mirror_ap, self.owe_networks, 573 hidden, same_ssid, 574 ssid_length_2g, 575 ssid_length_5g, "OWE") 576 owe_dict[hostapd_constants.BAND_2G]["security"] = "owe" 577 owe_dict[hostapd_constants.BAND_5G]["security"] = "owe" 578 network_list.append(owe_dict) 579 if sae_network: 580 sae_dict = self.get_psk_network(mirror_ap, self.sae_networks, 581 hidden, same_ssid, 582 hostapd_constants.SAE_KEY_MGMT, 583 ssid_length_2g, ssid_length_5g, 584 passphrase_length_2g, 585 passphrase_length_5g) 586 sae_dict[hostapd_constants.BAND_2G]["security"] = "sae" 587 sae_dict[hostapd_constants.BAND_5G]["security"] = "sae" 588 network_list.append(sae_dict) 589 if saemixed_network: 590 saemixed_dict = self.get_psk_network( 591 mirror_ap, self.saemixed_networks, hidden, same_ssid, 592 hostapd_constants.SAE_KEY_MGMT, ssid_length_2g, 593 ssid_length_5g, passphrase_length_2g, passphrase_length_5g) 594 saemixed_dict[ 595 hostapd_constants.BAND_2G]["security"] = "sae-mixed" 596 saemixed_dict[ 597 hostapd_constants.BAND_5G]["security"] = "sae-mixed" 598 saemixed_dict[ 599 hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w 600 saemixed_dict[ 601 hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w 602 network_list.append(saemixed_dict) 603 self.access_points[i].configure_ap(network_list, channels_2g[i], 604 channels_5g[i]) 605 self.access_points[i].start_ap() 606 self.bssid_map.append( 607 self.access_points[i].get_bssids_for_wifi_networks()) 608 if mirror_ap: 609 self.access_points[i + 1].configure_ap(network_list, 610 channels_2g[i + 1], 611 channels_5g[i + 1]) 612 self.access_points[i + 1].start_ap() 613 self.bssid_map.append( 614 self.access_points[i + 1].get_bssids_for_wifi_networks()) 615 break 616 617 def legacy_configure_ap_and_start( 618 self, 619 channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G, 620 channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G, 621 max_2g_networks=hostapd_constants.AP_DEFAULT_MAX_SSIDS_2G, 622 max_5g_networks=hostapd_constants.AP_DEFAULT_MAX_SSIDS_5G, 623 ap_ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 624 ap_passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, 625 ap_ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 626 ap_passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G, 627 hidden=False, 628 same_ssid=False, 629 mirror_ap=True, 630 wpa_network=False, 631 wep_network=False, 632 ent_network=False, 633 radius_conf_2g=None, 634 radius_conf_5g=None, 635 ent_network_pwd=False, 636 radius_conf_pwd=None, 637 ap_count=1): 638 639 config_count = 1 640 count = 0 641 642 # For example, the NetworkSelector tests use 2 APs and require that 643 # both APs are not mirrored. 644 if not mirror_ap and ap_count == 1: 645 raise ValueError("ap_count cannot be 1 if mirror_ap is False.") 646 647 if not mirror_ap: 648 config_count = ap_count 649 650 self.user_params["reference_networks"] = [] 651 self.user_params["open_network"] = [] 652 if wpa_network: 653 self.user_params["wpa_networks"] = [] 654 if wep_network: 655 self.user_params["wep_networks"] = [] 656 if ent_network: 657 self.user_params["ent_networks"] = [] 658 if ent_network_pwd: 659 self.user_params["ent_networks_pwd"] = [] 660 661 # kill hostapd & dhcpd if the cleanup was not successful 662 for i in range(len(self.access_points)): 663 self.log.debug("Check ap state and cleanup") 664 self._cleanup_hostapd_and_dhcpd(i) 665 666 for count in range(config_count): 667 668 network_list_2g = [] 669 network_list_5g = [] 670 671 orig_network_list_2g = [] 672 orig_network_list_5g = [] 673 674 network_list_2g.append({"channel": channel_2g}) 675 network_list_5g.append({"channel": channel_5g}) 676 677 networks_dict = self.get_psk_network( 678 mirror_ap, 679 self.user_params["reference_networks"], 680 hidden=hidden, 681 same_ssid=same_ssid) 682 self.reference_networks = self.user_params["reference_networks"] 683 684 network_list_2g.append(networks_dict["2g"]) 685 network_list_5g.append(networks_dict["5g"]) 686 687 # When same_ssid is set, only configure one set of WPA networks. 688 # We cannot have more than one set because duplicate interface names 689 # are not allowed. 690 # TODO(bmahadev): Provide option to select the type of network, 691 # instead of defaulting to WPA. 692 if not same_ssid: 693 networks_dict = self.get_open_network( 694 mirror_ap, 695 self.user_params["open_network"], 696 hidden=hidden, 697 same_ssid=same_ssid) 698 self.open_network = self.user_params["open_network"] 699 700 network_list_2g.append(networks_dict["2g"]) 701 network_list_5g.append(networks_dict["5g"]) 702 703 if wpa_network: 704 networks_dict = self.get_psk_network( 705 mirror_ap, 706 self.user_params["wpa_networks"], 707 hidden=hidden, 708 same_ssid=same_ssid, 709 security_mode=hostapd_constants.WPA_STRING) 710 self.wpa_networks = self.user_params["wpa_networks"] 711 712 network_list_2g.append(networks_dict["2g"]) 713 network_list_5g.append(networks_dict["5g"]) 714 715 if wep_network: 716 networks_dict = self.get_wep_network( 717 mirror_ap, 718 self.user_params["wep_networks"], 719 hidden=hidden, 720 same_ssid=same_ssid) 721 self.wep_networks = self.user_params["wep_networks"] 722 723 network_list_2g.append(networks_dict["2g"]) 724 network_list_5g.append(networks_dict["5g"]) 725 726 if ent_network: 727 networks_dict = self.get_open_network( 728 mirror_ap, 729 self.user_params["ent_networks"], 730 hidden=hidden, 731 same_ssid=same_ssid) 732 networks_dict["2g"][ 733 "security"] = hostapd_constants.ENT_STRING 734 networks_dict["2g"].update(radius_conf_2g) 735 networks_dict["5g"][ 736 "security"] = hostapd_constants.ENT_STRING 737 networks_dict["5g"].update(radius_conf_5g) 738 self.ent_networks = self.user_params["ent_networks"] 739 740 network_list_2g.append(networks_dict["2g"]) 741 network_list_5g.append(networks_dict["5g"]) 742 743 if ent_network_pwd: 744 networks_dict = self.get_open_network( 745 mirror_ap, 746 self.user_params["ent_networks_pwd"], 747 hidden=hidden, 748 same_ssid=same_ssid) 749 networks_dict["2g"][ 750 "security"] = hostapd_constants.ENT_STRING 751 networks_dict["2g"].update(radius_conf_pwd) 752 networks_dict["5g"][ 753 "security"] = hostapd_constants.ENT_STRING 754 networks_dict["5g"].update(radius_conf_pwd) 755 self.ent_networks_pwd = self.user_params[ 756 "ent_networks_pwd"] 757 758 network_list_2g.append(networks_dict["2g"]) 759 network_list_5g.append(networks_dict["5g"]) 760 761 orig_network_list_5g = copy.copy(network_list_5g) 762 orig_network_list_2g = copy.copy(network_list_2g) 763 764 if len(network_list_5g) > 1: 765 self.config_5g = self._generate_legacy_ap_config( 766 network_list_5g) 767 if len(network_list_2g) > 1: 768 self.config_2g = self._generate_legacy_ap_config( 769 network_list_2g) 770 771 self.access_points[count].start_ap(self.config_2g) 772 self.access_points[count].start_ap(self.config_5g) 773 self.populate_bssid(count, self.access_points[count], 774 orig_network_list_5g, orig_network_list_2g) 775 776 # Repeat configuration on the second router. 777 if mirror_ap and ap_count == 2: 778 self.access_points[AP_2].start_ap(self.config_2g) 779 self.access_points[AP_2].start_ap(self.config_5g) 780 self.populate_bssid(AP_2, self.access_points[AP_2], 781 orig_network_list_5g, orig_network_list_2g) 782 783 def _kill_processes(self, ap, daemon): 784 """ Kill hostapd and dhcpd daemons 785 786 Args: 787 ap: AP to cleanup 788 daemon: process to kill 789 790 Returns: True/False if killing process is successful 791 """ 792 self.log.info("Killing %s" % daemon) 793 pids = ap.ssh.run('pidof %s' % daemon, ignore_status=True) 794 if pids.stdout: 795 ap.ssh.run('kill %s' % pids.stdout, ignore_status=True) 796 time.sleep(3) 797 pids = ap.ssh.run('pidof %s' % daemon, ignore_status=True) 798 if pids.stdout: 799 return False 800 return True 801 802 def _cleanup_hostapd_and_dhcpd(self, count): 803 """ Check if AP was cleaned up properly 804 805 Kill hostapd and dhcpd processes if cleanup was not successful in the 806 last run 807 808 Args: 809 count: AP to check 810 811 Returns: 812 New AccessPoint object if AP required cleanup 813 814 Raises: 815 Error: if the AccessPoint timed out to setup 816 """ 817 ap = self.access_points[count] 818 phy_ifaces = ap.interfaces.get_physical_interface() 819 kill_hostapd = False 820 for iface in phy_ifaces: 821 if '2g_' in iface or '5g_' in iface or 'xg_' in iface: 822 kill_hostapd = True 823 break 824 825 if not kill_hostapd: 826 return 827 828 self.log.debug("Cleanup AP") 829 if not self._kill_processes(ap, 'hostapd') or \ 830 not self._kill_processes(ap, 'dhcpd'): 831 raise ("Failed to cleanup AP") 832 833 ap.__init__(self.user_params['AccessPoint'][count]) 834 835 def _generate_legacy_ap_config(self, network_list): 836 bss_settings = [] 837 wlan_2g = self.access_points[AP_1].wlan_2g 838 wlan_5g = self.access_points[AP_1].wlan_5g 839 ap_settings = network_list.pop(0) 840 # TODO:(bmahadev) This is a bug. We should not have to pop the first 841 # network in the list and treat it as a separate case. Instead, 842 # create_ap_preset() should be able to take NULL ssid and security and 843 # build config based on the bss_Settings alone. 844 hostapd_config_settings = network_list.pop(0) 845 for network in network_list: 846 if "password" in network: 847 bss_settings.append( 848 hostapd_bss_settings.BssSettings( 849 name=network["SSID"], 850 ssid=network["SSID"], 851 hidden=network["hiddenSSID"], 852 security=hostapd_security.Security( 853 security_mode=network["security"], 854 password=network["password"]))) 855 elif "wepKeys" in network: 856 bss_settings.append( 857 hostapd_bss_settings.BssSettings( 858 name=network["SSID"], 859 ssid=network["SSID"], 860 hidden=network["hiddenSSID"], 861 security=hostapd_security.Security( 862 security_mode=network["security"], 863 password=network["wepKeys"][0]))) 864 elif network["security"] == hostapd_constants.ENT_STRING: 865 bss_settings.append( 866 hostapd_bss_settings.BssSettings( 867 name=network["SSID"], 868 ssid=network["SSID"], 869 hidden=network["hiddenSSID"], 870 security=hostapd_security.Security( 871 security_mode=network["security"], 872 radius_server_ip=network["radius_server_ip"], 873 radius_server_port=network["radius_server_port"], 874 radius_server_secret=network[ 875 "radius_server_secret"]))) 876 else: 877 bss_settings.append( 878 hostapd_bss_settings.BssSettings( 879 name=network["SSID"], 880 ssid=network["SSID"], 881 hidden=network["hiddenSSID"])) 882 if "password" in hostapd_config_settings: 883 config = hostapd_ap_preset.create_ap_preset( 884 iface_wlan_2g=wlan_2g, 885 iface_wlan_5g=wlan_5g, 886 channel=ap_settings["channel"], 887 ssid=hostapd_config_settings["SSID"], 888 hidden=hostapd_config_settings["hiddenSSID"], 889 security=hostapd_security.Security( 890 security_mode=hostapd_config_settings["security"], 891 password=hostapd_config_settings["password"]), 892 bss_settings=bss_settings) 893 elif "wepKeys" in hostapd_config_settings: 894 config = hostapd_ap_preset.create_ap_preset( 895 iface_wlan_2g=wlan_2g, 896 iface_wlan_5g=wlan_5g, 897 channel=ap_settings["channel"], 898 ssid=hostapd_config_settings["SSID"], 899 hidden=hostapd_config_settings["hiddenSSID"], 900 security=hostapd_security.Security( 901 security_mode=hostapd_config_settings["security"], 902 password=hostapd_config_settings["wepKeys"][0]), 903 bss_settings=bss_settings) 904 else: 905 config = hostapd_ap_preset.create_ap_preset( 906 iface_wlan_2g=wlan_2g, 907 iface_wlan_5g=wlan_5g, 908 channel=ap_settings["channel"], 909 ssid=hostapd_config_settings["SSID"], 910 hidden=hostapd_config_settings["hiddenSSID"], 911 bss_settings=bss_settings) 912 return config 913 914 def configure_packet_capture( 915 self, 916 channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G, 917 channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G): 918 """Configure packet capture for 2G and 5G bands. 919 920 Args: 921 channel_5g: Channel to set the monitor mode to for 5G band. 922 channel_2g: Channel to set the monitor mode to for 2G band. 923 """ 924 self.packet_capture = self.packet_capture[0] 925 result = self.packet_capture.configure_monitor_mode( 926 hostapd_constants.BAND_2G, channel_2g) 927 if not result: 928 raise ValueError("Failed to configure channel for 2G band") 929 930 result = self.packet_capture.configure_monitor_mode( 931 hostapd_constants.BAND_5G, channel_5g) 932 if not result: 933 raise ValueError("Failed to configure channel for 5G band.") 934 935 @staticmethod 936 def wifi_test_wrap(fn): 937 def _safe_wrap_test_case(self, *args, **kwargs): 938 test_id = "%s:%s:%s" % (self.__class__.__name__, self.test_name, 939 self.log_begin_time.replace(' ', '-')) 940 self.test_id = test_id 941 self.result_detail = "" 942 tries = int(self.user_params.get("wifi_auto_rerun", 3)) 943 for ad in self.android_devices: 944 ad.log_path = self.log_path 945 for i in range(tries + 1): 946 result = True 947 if i > 0: 948 log_string = "[Test Case] RETRY:%s %s" % (i, 949 self.test_name) 950 self.log.info(log_string) 951 self._teardown_test(self.test_name) 952 self._setup_test(self.test_name) 953 try: 954 result = fn(self, *args, **kwargs) 955 except signals.TestFailure as e: 956 self.log.warn("Error msg: %s" % e) 957 if self.result_detail: 958 signal.details = self.result_detail 959 result = False 960 except signals.TestSignal: 961 if self.result_detail: 962 signal.details = self.result_detail 963 raise 964 except Exception as e: 965 self.log.exception(e) 966 asserts.fail(self.result_detail) 967 if result is False: 968 if i < tries: 969 continue 970 else: 971 break 972 if result is not False: 973 asserts.explicit_pass(self.result_detail) 974 else: 975 asserts.fail(self.result_detail) 976 977 return _safe_wrap_test_case 978 979 def _configure_flagged_features(self, ad, flagged_features): 980 for module, features in flagged_features.items(): 981 for feature in features: 982 value = flagged_features[module][feature].lower() 983 if value not in ("true", "false"): 984 raise ValueError("Invalid flag value for %s %s." % (module, feature)) 985 self.log.info("Setting feature flag %s %s to %s." % (module, feature, value)) 986 adb_put_command = "device_config put %s %s %s" % (module, feature, value) 987 adb_get_command = "device_config get %s %s" % (module, feature) 988 ad.adb.shell(adb_put_command) 989 value_from_get = ad.adb.shell(adb_get_command) 990 991 if type(value_from_get) != str or value_from_get.lower() != value: 992 raise RuntimeError("Failed to set flag value to %s (now is %s) for %s %s." % (value, value_from_get, module, feature))