1#!/usr/bin/env python3 2# 3# Copyright 2017 - Google 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16""" 17 Base Class for Defining Common WiFi Test Functionality 18""" 19 20import copy 21import itertools 22import os 23import time 24 25import acts.controllers.access_point as ap 26 27from acts import asserts 28from acts import signals 29from acts import utils 30from acts.base_test import BaseTestClass 31from acts.signals import TestSignal 32from acts.controllers import android_device 33from acts.controllers.access_point import AccessPoint 34from acts.controllers.ap_lib import hostapd_ap_preset 35from acts.controllers.ap_lib import hostapd_bss_settings 36from acts.controllers.ap_lib import hostapd_constants 37from acts.controllers.ap_lib import hostapd_security 38from acts.keys import Config 39from acts_contrib.test_utils.net import net_test_utils as nutils 40from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 41 42AP_1 = 0 43AP_2 = 1 44MAX_AP_COUNT = 2 45 46 47class WifiBaseTest(BaseTestClass): 48 def __init__(self, configs): 49 super().__init__(configs) 50 self.enable_packet_log = False 51 self.packet_log_2g = hostapd_constants.AP_DEFAULT_CHANNEL_2G 52 self.packet_log_5g = hostapd_constants.AP_DEFAULT_CHANNEL_5G 53 54 def setup_class(self): 55 if hasattr(self, 'attenuators') and self.attenuators: 56 for attenuator in self.attenuators: 57 attenuator.set_atten(0) 58 opt_param = ["pixel_models", "cnss_diag_file", "country_code_file"] 59 self.unpack_userparams(opt_param_names=opt_param) 60 if hasattr(self, "cnss_diag_file"): 61 if isinstance(self.cnss_diag_file, list): 62 self.cnss_diag_file = self.cnss_diag_file[0] 63 if not os.path.isfile(self.cnss_diag_file): 64 self.cnss_diag_file = os.path.join( 65 self.user_params[Config.key_config_path.value], 66 self.cnss_diag_file) 67 if self.enable_packet_log and hasattr(self, "packet_capture"): 68 self.packet_logger = self.packet_capture[0] 69 self.packet_logger.configure_monitor_mode("2G", self.packet_log_2g) 70 self.packet_logger.configure_monitor_mode("5G", self.packet_log_5g) 71 if hasattr(self, "android_devices"): 72 for ad in self.android_devices: 73 wutils.wifi_test_device_init(ad) 74 if hasattr(self, "country_code_file"): 75 if isinstance(self.country_code_file, list): 76 self.country_code_file = self.country_code_file[0] 77 if not os.path.isfile(self.country_code_file): 78 self.country_code_file = os.path.join( 79 self.user_params[Config.key_config_path.value], 80 self.country_code_file) 81 self.country_code = utils.load_config( 82 self.country_code_file)["country"] 83 wutils.set_wifi_country_code(ad, self.country_code) 84 85 def setup_test(self): 86 if (hasattr(self, "android_devices") 87 and hasattr(self, "cnss_diag_file") 88 and hasattr(self, "pixel_models")): 89 wutils.start_cnss_diags(self.android_devices, self.cnss_diag_file, 90 self.pixel_models) 91 self.tcpdump_proc = [] 92 if hasattr(self, "android_devices"): 93 for ad in self.android_devices: 94 proc = nutils.start_tcpdump(ad, self.test_name) 95 self.tcpdump_proc.append((ad, proc)) 96 if hasattr(self, "packet_logger"): 97 self.packet_log_pid = wutils.start_pcap(self.packet_logger, 'dual', 98 self.test_name) 99 100 def teardown_test(self): 101 if (hasattr(self, "android_devices") 102 and hasattr(self, "cnss_diag_file") 103 and hasattr(self, "pixel_models")): 104 wutils.stop_cnss_diags(self.android_devices, self.pixel_models) 105 for proc in self.tcpdump_proc: 106 nutils.stop_tcpdump(proc[0], 107 proc[1], 108 self.test_name, 109 pull_dump=False) 110 self.tcpdump_proc = [] 111 if hasattr(self, "packet_logger") and self.packet_log_pid: 112 wutils.stop_pcap(self.packet_logger, 113 self.packet_log_pid, 114 test_status=True) 115 self.packet_log_pid = {} 116 117 def on_fail(self, test_name, begin_time): 118 if hasattr(self, "android_devices"): 119 for ad in self.android_devices: 120 ad.take_bug_report(test_name, begin_time) 121 ad.cat_adb_log(test_name, begin_time) 122 wutils.get_ssrdumps(ad) 123 if (hasattr(self, "cnss_diag_file") 124 and hasattr(self, "pixel_models")): 125 wutils.stop_cnss_diags(self.android_devices, self.pixel_models) 126 for ad in self.android_devices: 127 wutils.get_cnss_diag_log(ad) 128 for proc in self.tcpdump_proc: 129 nutils.stop_tcpdump(proc[0], proc[1], self.test_name) 130 self.tcpdump_proc = [] 131 if hasattr(self, "packet_logger") and self.packet_log_pid: 132 wutils.stop_pcap(self.packet_logger, 133 self.packet_log_pid, 134 test_status=False) 135 self.packet_log_pid = {} 136 137 def get_psk_network( 138 self, 139 mirror_ap, 140 reference_networks, 141 hidden=False, 142 same_ssid=False, 143 security_mode=hostapd_constants.WPA2_STRING, 144 ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 145 ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 146 passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, 147 passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G): 148 """Generates SSID and passphrase for a WPA2 network using random 149 generator. 150 151 Args: 152 mirror_ap: Boolean, determines if both APs use the same hostapd 153 config or different configs. 154 reference_networks: List of PSK networks. 155 same_ssid: Boolean, determines if both bands on AP use the same 156 SSID. 157 ssid_length_2gecond AP Int, number of characters to use for 2G SSID. 158 ssid_length_5g: Int, number of characters to use for 5G SSID. 159 passphrase_length_2g: Int, length of password for 2G network. 160 passphrase_length_5g: Int, length of password for 5G network. 161 162 Returns: A dict of 2G and 5G network lists for hostapd configuration. 163 164 """ 165 network_dict_2g = {} 166 network_dict_5g = {} 167 ref_5g_security = security_mode 168 ref_2g_security = security_mode 169 170 if same_ssid: 171 ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) 172 ref_5g_ssid = ref_2g_ssid 173 174 ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g) 175 ref_5g_passphrase = ref_2g_passphrase 176 177 else: 178 ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) 179 ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g) 180 181 ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) 182 ref_5g_passphrase = utils.rand_ascii_str(passphrase_length_5g) 183 184 network_dict_2g = { 185 "SSID": ref_2g_ssid, 186 "security": ref_2g_security, 187 "password": ref_2g_passphrase, 188 "hiddenSSID": hidden 189 } 190 191 network_dict_5g = { 192 "SSID": ref_5g_ssid, 193 "security": ref_5g_security, 194 "password": ref_5g_passphrase, 195 "hiddenSSID": hidden 196 } 197 198 ap = 0 199 for ap in range(MAX_AP_COUNT): 200 reference_networks.append({ 201 "2g": copy.copy(network_dict_2g), 202 "5g": copy.copy(network_dict_5g) 203 }) 204 if not mirror_ap: 205 break 206 return {"2g": network_dict_2g, "5g": network_dict_5g} 207 208 def get_open_network(self, 209 mirror_ap, 210 open_network, 211 hidden=False, 212 same_ssid=False, 213 ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 214 ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 215 security_mode='none'): 216 """Generates SSIDs for a open network using a random generator. 217 218 Args: 219 mirror_ap: Boolean, determines if both APs use the same hostapd 220 config or different configs. 221 open_network: List of open networks. 222 same_ssid: Boolean, determines if both bands on AP use the same 223 SSID. 224 ssid_length_2g: Int, number of characters to use for 2G SSID. 225 ssid_length_5g: Int, number of characters to use for 5G SSID. 226 security_mode: 'none' for open and 'OWE' for WPA3 OWE. 227 228 Returns: A dict of 2G and 5G network lists for hostapd configuration. 229 230 """ 231 network_dict_2g = {} 232 network_dict_5g = {} 233 234 if same_ssid: 235 open_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) 236 open_5g_ssid = open_2g_ssid 237 238 else: 239 open_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) 240 open_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) 241 242 network_dict_2g = { 243 "SSID": open_2g_ssid, 244 "security": security_mode, 245 "hiddenSSID": hidden 246 } 247 248 network_dict_5g = { 249 "SSID": open_5g_ssid, 250 "security": security_mode, 251 "hiddenSSID": hidden 252 } 253 254 ap = 0 255 for ap in range(MAX_AP_COUNT): 256 open_network.append({ 257 "2g": copy.copy(network_dict_2g), 258 "5g": copy.copy(network_dict_5g) 259 }) 260 if not mirror_ap: 261 break 262 return {"2g": network_dict_2g, "5g": network_dict_5g} 263 264 def get_wep_network( 265 self, 266 mirror_ap, 267 networks, 268 hidden=False, 269 same_ssid=False, 270 ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 271 ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 272 passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, 273 passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G): 274 """Generates SSID and passphrase for a WEP network using random 275 generator. 276 277 Args: 278 mirror_ap: Boolean, determines if both APs use the same hostapd 279 config or different configs. 280 networks: List of WEP networks. 281 same_ssid: Boolean, determines if both bands on AP use the same 282 SSID. 283 ssid_length_2gecond AP Int, number of characters to use for 2G SSID. 284 ssid_length_5g: Int, number of characters to use for 5G SSID. 285 passphrase_length_2g: Int, length of password for 2G network. 286 passphrase_length_5g: Int, length of password for 5G network. 287 288 Returns: A dict of 2G and 5G network lists for hostapd configuration. 289 290 """ 291 network_dict_2g = {} 292 network_dict_5g = {} 293 ref_5g_security = hostapd_constants.WEP_STRING 294 ref_2g_security = hostapd_constants.WEP_STRING 295 296 if same_ssid: 297 ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) 298 ref_5g_ssid = ref_2g_ssid 299 300 ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g) 301 ref_5g_passphrase = ref_2g_passphrase 302 303 else: 304 ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) 305 ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g) 306 307 ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) 308 ref_5g_passphrase = utils.rand_hex_str(passphrase_length_5g) 309 310 network_dict_2g = { 311 "SSID": ref_2g_ssid, 312 "security": ref_2g_security, 313 "wepKeys": [ref_2g_passphrase] * 4, 314 "hiddenSSID": hidden 315 } 316 317 network_dict_5g = { 318 "SSID": ref_5g_ssid, 319 "security": ref_5g_security, 320 "wepKeys": [ref_2g_passphrase] * 4, 321 "hiddenSSID": hidden 322 } 323 324 ap = 0 325 for ap in range(MAX_AP_COUNT): 326 networks.append({ 327 "2g": copy.copy(network_dict_2g), 328 "5g": copy.copy(network_dict_5g) 329 }) 330 if not mirror_ap: 331 break 332 return {"2g": network_dict_2g, "5g": network_dict_5g} 333 334 def update_bssid(self, ap_instance, ap, network, band): 335 """Get bssid and update network dictionary. 336 337 Args: 338 ap_instance: Accesspoint index that was configured. 339 ap: Accesspoint object corresponding to ap_instance. 340 network: Network dictionary. 341 band: Wifi networks' band. 342 343 """ 344 bssid = ap.get_bssid_from_ssid(network["SSID"], band) 345 346 if network["security"] == hostapd_constants.WPA2_STRING: 347 # TODO:(bamahadev) Change all occurances of reference_networks 348 # to wpa_networks. 349 self.reference_networks[ap_instance][band]["bssid"] = bssid 350 if network["security"] == hostapd_constants.WPA_STRING: 351 self.wpa_networks[ap_instance][band]["bssid"] = bssid 352 if network["security"] == hostapd_constants.WEP_STRING: 353 self.wep_networks[ap_instance][band]["bssid"] = bssid 354 if network["security"] == hostapd_constants.ENT_STRING: 355 if "bssid" not in self.ent_networks[ap_instance][band]: 356 self.ent_networks[ap_instance][band]["bssid"] = bssid 357 else: 358 self.ent_networks_pwd[ap_instance][band]["bssid"] = bssid 359 if network["security"] == 'none': 360 self.open_network[ap_instance][band]["bssid"] = bssid 361 362 def populate_bssid(self, ap_instance, ap, networks_5g, networks_2g): 363 """Get bssid for a given SSID and add it to the network dictionary. 364 365 Args: 366 ap_instance: Accesspoint index that was configured. 367 ap: Accesspoint object corresponding to ap_instance. 368 networks_5g: List of 5g networks configured on the APs. 369 networks_2g: List of 2g networks configured on the APs. 370 371 """ 372 373 if not (networks_5g or networks_2g): 374 return 375 376 for network in networks_5g: 377 if 'channel' in network: 378 continue 379 self.update_bssid(ap_instance, ap, network, 380 hostapd_constants.BAND_5G) 381 382 for network in networks_2g: 383 if 'channel' in network: 384 continue 385 self.update_bssid(ap_instance, ap, network, 386 hostapd_constants.BAND_2G) 387 388 def configure_openwrt_ap_and_start( 389 self, 390 channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G, 391 channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G, 392 channel_5g_ap2=None, 393 channel_2g_ap2=None, 394 ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 395 passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, 396 ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 397 passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G, 398 mirror_ap=False, 399 hidden=False, 400 same_ssid=False, 401 open_network=False, 402 wpa1_network=False, 403 wpa_network=False, 404 wep_network=False, 405 ent_network=False, 406 ent_network_pwd=False, 407 owe_network=False, 408 sae_network=False, 409 radius_conf_2g=None, 410 radius_conf_5g=None, 411 radius_conf_pwd=None, 412 ap_count=1): 413 """Create, configure and start OpenWrt AP. 414 415 Args: 416 channel_5g: 5G channel to configure. 417 channel_2g: 2G channel to configure. 418 channel_5g_ap2: 5G channel to configure on AP2. 419 channel_2g_ap2: 2G channel to configure on AP2. 420 ssid_length_2g: Int, number of characters to use for 2G SSID. 421 passphrase_length_2g: Int, length of password for 2G network. 422 ssid_length_5g: Int, number of characters to use for 5G SSID. 423 passphrase_length_5g: Int, length of password for 5G network. 424 same_ssid: Boolean, determines if both bands on AP use the same SSID. 425 open_network: Boolean, to check if open network should be configured. 426 wpa_network: Boolean, to check if wpa network should be configured. 427 wep_network: Boolean, to check if wep network should be configured. 428 ent_network: Boolean, to check if ent network should be configured. 429 ent_network_pwd: Boolean, to check if ent pwd network should be configured. 430 owe_network: Boolean, to check if owe network should be configured. 431 sae_network: Boolean, to check if sae network should be configured. 432 radius_conf_2g: dictionary with enterprise radius server details. 433 radius_conf_5g: dictionary with enterprise radius server details. 434 radius_conf_pwd: dictionary with enterprise radiuse server details. 435 ap_count: APs to configure. 436 """ 437 if mirror_ap and ap_count == 1: 438 raise ValueError("ap_count cannot be 1 if mirror_ap is True.") 439 if (channel_5g_ap2 or channel_2g_ap2) and ap_count == 1: 440 raise ValueError( 441 "ap_count cannot be 1 if channels of AP2 are provided.") 442 # we are creating a channel list for 2G and 5G bands. The list is of 443 # size 2 and this is based on the assumption that each testbed will have 444 # at most 2 APs. 445 if not channel_5g_ap2: 446 channel_5g_ap2 = channel_5g 447 if not channel_2g_ap2: 448 channel_2g_ap2 = channel_2g 449 channels_2g = [channel_2g, channel_2g_ap2] 450 channels_5g = [channel_5g, channel_5g_ap2] 451 452 self.reference_networks = [] 453 self.wpa1_networks = [] 454 self.wpa_networks = [] 455 self.wep_networks = [] 456 self.ent_networks = [] 457 self.ent_networks_pwd = [] 458 self.open_network = [] 459 self.owe_networks = [] 460 self.sae_networks = [] 461 self.bssid_map = [] 462 for i in range(ap_count): 463 network_list = [] 464 if wpa1_network: 465 wpa1_dict = self.get_psk_network(mirror_ap, 466 self.wpa1_networks, 467 hidden, same_ssid, 468 ssid_length_2g, ssid_length_5g, 469 passphrase_length_2g, 470 passphrase_length_5g) 471 wpa1_dict[hostapd_constants.BAND_2G]["security"] = "psk" 472 wpa1_dict[hostapd_constants.BAND_5G]["security"] = "psk" 473 self.wpa1_networks.append(wpa1_dict) 474 network_list.append(wpa1_dict) 475 if wpa_network: 476 wpa_dict = self.get_psk_network(mirror_ap, 477 self.reference_networks, 478 hidden, same_ssid, 479 ssid_length_2g, ssid_length_5g, 480 passphrase_length_2g, 481 passphrase_length_5g) 482 wpa_dict[hostapd_constants.BAND_2G]["security"] = "psk2" 483 wpa_dict[hostapd_constants.BAND_5G]["security"] = "psk2" 484 self.wpa_networks.append(wpa_dict) 485 network_list.append(wpa_dict) 486 if wep_network: 487 wep_dict = self.get_wep_network(mirror_ap, self.wep_networks, 488 hidden, same_ssid, 489 ssid_length_2g, ssid_length_5g) 490 network_list.append(wep_dict) 491 if ent_network: 492 ent_dict = self.get_open_network(mirror_ap, self.ent_networks, 493 hidden, same_ssid, 494 ssid_length_2g, 495 ssid_length_5g) 496 ent_dict["2g"]["security"] = "wpa2" 497 ent_dict["2g"].update(radius_conf_2g) 498 ent_dict["5g"]["security"] = "wpa2" 499 ent_dict["5g"].update(radius_conf_5g) 500 network_list.append(ent_dict) 501 if ent_network_pwd: 502 ent_pwd_dict = self.get_open_network(mirror_ap, 503 self.ent_networks_pwd, 504 hidden, same_ssid, 505 ssid_length_2g, 506 ssid_length_5g) 507 ent_pwd_dict["2g"]["security"] = "wpa2" 508 ent_pwd_dict["2g"].update(radius_conf_pwd) 509 ent_pwd_dict["5g"]["security"] = "wpa2" 510 ent_pwd_dict["5g"].update(radius_conf_pwd) 511 network_list.append(ent_pwd_dict) 512 if open_network: 513 open_dict = self.get_open_network(mirror_ap, self.open_network, 514 hidden, same_ssid, 515 ssid_length_2g, 516 ssid_length_5g) 517 network_list.append(open_dict) 518 if owe_network: 519 owe_dict = self.get_open_network(mirror_ap, self.owe_networks, 520 hidden, same_ssid, 521 ssid_length_2g, 522 ssid_length_5g, "OWE") 523 owe_dict[hostapd_constants.BAND_2G]["security"] = "owe" 524 owe_dict[hostapd_constants.BAND_5G]["security"] = "owe" 525 network_list.append(owe_dict) 526 if sae_network: 527 sae_dict = self.get_psk_network(mirror_ap, self.sae_networks, 528 hidden, same_ssid, 529 hostapd_constants.SAE_KEY_MGMT, 530 ssid_length_2g, ssid_length_5g, 531 passphrase_length_2g, 532 passphrase_length_5g) 533 sae_dict[hostapd_constants.BAND_2G]["security"] = "sae" 534 sae_dict[hostapd_constants.BAND_5G]["security"] = "sae" 535 network_list.append(sae_dict) 536 self.access_points[i].configure_ap(network_list, channels_2g[i], 537 channels_5g[i]) 538 self.access_points[i].start_ap() 539 self.bssid_map.append( 540 self.access_points[i].get_bssids_for_wifi_networks()) 541 if mirror_ap: 542 self.access_points[i + 1].configure_ap(network_list, 543 channels_2g[i+1], 544 channels_5g[i+1]) 545 self.access_points[i + 1].start_ap() 546 self.bssid_map.append( 547 self.access_points[i + 1].get_bssids_for_wifi_networks()) 548 break 549 550 def legacy_configure_ap_and_start( 551 self, 552 channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G, 553 channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G, 554 max_2g_networks=hostapd_constants.AP_DEFAULT_MAX_SSIDS_2G, 555 max_5g_networks=hostapd_constants.AP_DEFAULT_MAX_SSIDS_5G, 556 ap_ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, 557 ap_passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, 558 ap_ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, 559 ap_passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G, 560 hidden=False, 561 same_ssid=False, 562 mirror_ap=True, 563 wpa_network=False, 564 wep_network=False, 565 ent_network=False, 566 radius_conf_2g=None, 567 radius_conf_5g=None, 568 ent_network_pwd=False, 569 radius_conf_pwd=None, 570 ap_count=1): 571 572 config_count = 1 573 count = 0 574 575 # For example, the NetworkSelector tests use 2 APs and require that 576 # both APs are not mirrored. 577 if not mirror_ap and ap_count == 1: 578 raise ValueError("ap_count cannot be 1 if mirror_ap is False.") 579 580 if not mirror_ap: 581 config_count = ap_count 582 583 self.user_params["reference_networks"] = [] 584 self.user_params["open_network"] = [] 585 if wpa_network: 586 self.user_params["wpa_networks"] = [] 587 if wep_network: 588 self.user_params["wep_networks"] = [] 589 if ent_network: 590 self.user_params["ent_networks"] = [] 591 if ent_network_pwd: 592 self.user_params["ent_networks_pwd"] = [] 593 594 # kill hostapd & dhcpd if the cleanup was not successful 595 for i in range(len(self.access_points)): 596 self.log.debug("Check ap state and cleanup") 597 self._cleanup_hostapd_and_dhcpd(i) 598 599 for count in range(config_count): 600 601 network_list_2g = [] 602 network_list_5g = [] 603 604 orig_network_list_2g = [] 605 orig_network_list_5g = [] 606 607 network_list_2g.append({"channel": channel_2g}) 608 network_list_5g.append({"channel": channel_5g}) 609 610 networks_dict = self.get_psk_network( 611 mirror_ap, 612 self.user_params["reference_networks"], 613 hidden=hidden, 614 same_ssid=same_ssid) 615 self.reference_networks = self.user_params["reference_networks"] 616 617 network_list_2g.append(networks_dict["2g"]) 618 network_list_5g.append(networks_dict["5g"]) 619 620 # When same_ssid is set, only configure one set of WPA networks. 621 # We cannot have more than one set because duplicate interface names 622 # are not allowed. 623 # TODO(bmahadev): Provide option to select the type of network, 624 # instead of defaulting to WPA. 625 if not same_ssid: 626 networks_dict = self.get_open_network( 627 mirror_ap, 628 self.user_params["open_network"], 629 hidden=hidden, 630 same_ssid=same_ssid) 631 self.open_network = self.user_params["open_network"] 632 633 network_list_2g.append(networks_dict["2g"]) 634 network_list_5g.append(networks_dict["5g"]) 635 636 if wpa_network: 637 networks_dict = self.get_psk_network( 638 mirror_ap, 639 self.user_params["wpa_networks"], 640 hidden=hidden, 641 same_ssid=same_ssid, 642 security_mode=hostapd_constants.WPA_STRING) 643 self.wpa_networks = self.user_params["wpa_networks"] 644 645 network_list_2g.append(networks_dict["2g"]) 646 network_list_5g.append(networks_dict["5g"]) 647 648 if wep_network: 649 networks_dict = self.get_wep_network( 650 mirror_ap, 651 self.user_params["wep_networks"], 652 hidden=hidden, 653 same_ssid=same_ssid) 654 self.wep_networks = self.user_params["wep_networks"] 655 656 network_list_2g.append(networks_dict["2g"]) 657 network_list_5g.append(networks_dict["5g"]) 658 659 if ent_network: 660 networks_dict = self.get_open_network( 661 mirror_ap, 662 self.user_params["ent_networks"], 663 hidden=hidden, 664 same_ssid=same_ssid) 665 networks_dict["2g"][ 666 "security"] = hostapd_constants.ENT_STRING 667 networks_dict["2g"].update(radius_conf_2g) 668 networks_dict["5g"][ 669 "security"] = hostapd_constants.ENT_STRING 670 networks_dict["5g"].update(radius_conf_5g) 671 self.ent_networks = self.user_params["ent_networks"] 672 673 network_list_2g.append(networks_dict["2g"]) 674 network_list_5g.append(networks_dict["5g"]) 675 676 if ent_network_pwd: 677 networks_dict = self.get_open_network( 678 mirror_ap, 679 self.user_params["ent_networks_pwd"], 680 hidden=hidden, 681 same_ssid=same_ssid) 682 networks_dict["2g"][ 683 "security"] = hostapd_constants.ENT_STRING 684 networks_dict["2g"].update(radius_conf_pwd) 685 networks_dict["5g"][ 686 "security"] = hostapd_constants.ENT_STRING 687 networks_dict["5g"].update(radius_conf_pwd) 688 self.ent_networks_pwd = self.user_params[ 689 "ent_networks_pwd"] 690 691 network_list_2g.append(networks_dict["2g"]) 692 network_list_5g.append(networks_dict["5g"]) 693 694 orig_network_list_5g = copy.copy(network_list_5g) 695 orig_network_list_2g = copy.copy(network_list_2g) 696 697 if len(network_list_5g) > 1: 698 self.config_5g = self._generate_legacy_ap_config( 699 network_list_5g) 700 if len(network_list_2g) > 1: 701 self.config_2g = self._generate_legacy_ap_config( 702 network_list_2g) 703 704 self.access_points[count].start_ap(self.config_2g) 705 self.access_points[count].start_ap(self.config_5g) 706 self.populate_bssid(count, self.access_points[count], 707 orig_network_list_5g, orig_network_list_2g) 708 709 # Repeat configuration on the second router. 710 if mirror_ap and ap_count == 2: 711 self.access_points[AP_2].start_ap(self.config_2g) 712 self.access_points[AP_2].start_ap(self.config_5g) 713 self.populate_bssid(AP_2, self.access_points[AP_2], 714 orig_network_list_5g, orig_network_list_2g) 715 716 def _kill_processes(self, ap, daemon): 717 """ Kill hostapd and dhcpd daemons 718 719 Args: 720 ap: AP to cleanup 721 daemon: process to kill 722 723 Returns: True/False if killing process is successful 724 """ 725 self.log.info("Killing %s" % daemon) 726 pids = ap.ssh.run('pidof %s' % daemon, ignore_status=True) 727 if pids.stdout: 728 ap.ssh.run('kill %s' % pids.stdout, ignore_status=True) 729 time.sleep(3) 730 pids = ap.ssh.run('pidof %s' % daemon, ignore_status=True) 731 if pids.stdout: 732 return False 733 return True 734 735 def _cleanup_hostapd_and_dhcpd(self, count): 736 """ Check if AP was cleaned up properly 737 738 Kill hostapd and dhcpd processes if cleanup was not successful in the 739 last run 740 741 Args: 742 count: AP to check 743 744 Returns: 745 New AccessPoint object if AP required cleanup 746 747 Raises: 748 Error: if the AccessPoint timed out to setup 749 """ 750 ap = self.access_points[count] 751 phy_ifaces = ap.interfaces.get_physical_interface() 752 kill_hostapd = False 753 for iface in phy_ifaces: 754 if '2g_' in iface or '5g_' in iface or 'xg_' in iface: 755 kill_hostapd = True 756 break 757 758 if not kill_hostapd: 759 return 760 761 self.log.debug("Cleanup AP") 762 if not self._kill_processes(ap, 'hostapd') or \ 763 not self._kill_processes(ap, 'dhcpd'): 764 raise ("Failed to cleanup AP") 765 766 ap.__init__(self.user_params['AccessPoint'][count]) 767 768 def _generate_legacy_ap_config(self, network_list): 769 bss_settings = [] 770 wlan_2g = self.access_points[AP_1].wlan_2g 771 wlan_5g = self.access_points[AP_1].wlan_5g 772 ap_settings = network_list.pop(0) 773 # TODO:(bmahadev) This is a bug. We should not have to pop the first 774 # network in the list and treat it as a separate case. Instead, 775 # create_ap_preset() should be able to take NULL ssid and security and 776 # build config based on the bss_Settings alone. 777 hostapd_config_settings = network_list.pop(0) 778 for network in network_list: 779 if "password" in network: 780 bss_settings.append( 781 hostapd_bss_settings.BssSettings( 782 name=network["SSID"], 783 ssid=network["SSID"], 784 hidden=network["hiddenSSID"], 785 security=hostapd_security.Security( 786 security_mode=network["security"], 787 password=network["password"]))) 788 elif "wepKeys" in network: 789 bss_settings.append( 790 hostapd_bss_settings.BssSettings( 791 name=network["SSID"], 792 ssid=network["SSID"], 793 hidden=network["hiddenSSID"], 794 security=hostapd_security.Security( 795 security_mode=network["security"], 796 password=network["wepKeys"][0]))) 797 elif network["security"] == hostapd_constants.ENT_STRING: 798 bss_settings.append( 799 hostapd_bss_settings.BssSettings( 800 name=network["SSID"], 801 ssid=network["SSID"], 802 hidden=network["hiddenSSID"], 803 security=hostapd_security.Security( 804 security_mode=network["security"], 805 radius_server_ip=network["radius_server_ip"], 806 radius_server_port=network["radius_server_port"], 807 radius_server_secret=network[ 808 "radius_server_secret"]))) 809 else: 810 bss_settings.append( 811 hostapd_bss_settings.BssSettings( 812 name=network["SSID"], 813 ssid=network["SSID"], 814 hidden=network["hiddenSSID"])) 815 if "password" in hostapd_config_settings: 816 config = hostapd_ap_preset.create_ap_preset( 817 iface_wlan_2g=wlan_2g, 818 iface_wlan_5g=wlan_5g, 819 channel=ap_settings["channel"], 820 ssid=hostapd_config_settings["SSID"], 821 hidden=hostapd_config_settings["hiddenSSID"], 822 security=hostapd_security.Security( 823 security_mode=hostapd_config_settings["security"], 824 password=hostapd_config_settings["password"]), 825 bss_settings=bss_settings) 826 elif "wepKeys" in hostapd_config_settings: 827 config = hostapd_ap_preset.create_ap_preset( 828 iface_wlan_2g=wlan_2g, 829 iface_wlan_5g=wlan_5g, 830 channel=ap_settings["channel"], 831 ssid=hostapd_config_settings["SSID"], 832 hidden=hostapd_config_settings["hiddenSSID"], 833 security=hostapd_security.Security( 834 security_mode=hostapd_config_settings["security"], 835 password=hostapd_config_settings["wepKeys"][0]), 836 bss_settings=bss_settings) 837 else: 838 config = hostapd_ap_preset.create_ap_preset( 839 iface_wlan_2g=wlan_2g, 840 iface_wlan_5g=wlan_5g, 841 channel=ap_settings["channel"], 842 ssid=hostapd_config_settings["SSID"], 843 hidden=hostapd_config_settings["hiddenSSID"], 844 bss_settings=bss_settings) 845 return config 846 847 def configure_packet_capture( 848 self, 849 channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G, 850 channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G): 851 """Configure packet capture for 2G and 5G bands. 852 853 Args: 854 channel_5g: Channel to set the monitor mode to for 5G band. 855 channel_2g: Channel to set the monitor mode to for 2G band. 856 """ 857 self.packet_capture = self.packet_capture[0] 858 result = self.packet_capture.configure_monitor_mode( 859 hostapd_constants.BAND_2G, channel_2g) 860 if not result: 861 raise ValueError("Failed to configure channel for 2G band") 862 863 result = self.packet_capture.configure_monitor_mode( 864 hostapd_constants.BAND_5G, channel_5g) 865 if not result: 866 raise ValueError("Failed to configure channel for 5G band.") 867 868 @staticmethod 869 def wifi_test_wrap(fn): 870 def _safe_wrap_test_case(self, *args, **kwargs): 871 test_id = "%s:%s:%s" % (self.__class__.__name__, self.test_name, 872 self.log_begin_time.replace(' ', '-')) 873 self.test_id = test_id 874 self.result_detail = "" 875 tries = int(self.user_params.get("wifi_auto_rerun", 3)) 876 for ad in self.android_devices: 877 ad.log_path = self.log_path 878 for i in range(tries + 1): 879 result = True 880 if i > 0: 881 log_string = "[Test Case] RETRY:%s %s" % (i, 882 self.test_name) 883 self.log.info(log_string) 884 self._teardown_test(self.test_name) 885 self._setup_test(self.test_name) 886 try: 887 result = fn(self, *args, **kwargs) 888 except signals.TestFailure as e: 889 self.log.warn("Error msg: %s" % e) 890 if self.result_detail: 891 signal.details = self.result_detail 892 result = False 893 except signals.TestSignal: 894 if self.result_detail: 895 signal.details = self.result_detail 896 raise 897 except Exception as e: 898 self.log.exception(e) 899 asserts.fail(self.result_detail) 900 if result is False: 901 if i < tries: 902 continue 903 else: 904 break 905 if result is not False: 906 asserts.explicit_pass(self.result_detail) 907 else: 908 asserts.fail(self.result_detail) 909 910 return _safe_wrap_test_case 911