• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2017 - Google
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16"""
17    Base Class for Defining Common WiFi Test Functionality
18"""
19
20import copy
21import os
22import time
23
24from acts import asserts
25from acts import context
26from acts import signals
27from acts import utils
28from acts.base_test import BaseTestClass
29from acts.controllers.ap_lib import hostapd_ap_preset
30from acts.controllers.ap_lib import hostapd_bss_settings
31from acts.controllers.ap_lib import hostapd_constants
32from acts.controllers.ap_lib import hostapd_security
33from acts.keys import Config
34from acts_contrib.test_utils.net import net_test_utils as nutils
35from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
36
37from mobly.base_test import STAGE_NAME_TEARDOWN_CLASS
38
39WifiEnums = wutils.WifiEnums
40AP_1 = 0
41AP_2 = 1
42MAX_AP_COUNT = 2
43
44
45class WifiBaseTest(BaseTestClass):
46    def __init__(self, configs):
47        super().__init__(configs)
48        self.enable_packet_log = False
49        self.packet_log_2g = hostapd_constants.AP_DEFAULT_CHANNEL_2G
50        self.packet_log_5g = hostapd_constants.AP_DEFAULT_CHANNEL_5G
51        self.tcpdump_proc = []
52        self.packet_log_pid = {}
53
54    def setup_class(self):
55        if hasattr(self, 'attenuators') and self.attenuators:
56            for attenuator in self.attenuators:
57                attenuator.set_atten(0)
58        opt_param = ["country_code_file"]
59        self.unpack_userparams(opt_param_names=opt_param)
60        if self.enable_packet_log and hasattr(self, "packet_capture"):
61            self.packet_logger = self.packet_capture[0]
62            self.packet_logger.configure_monitor_mode("2G", self.packet_log_2g)
63            self.packet_logger.configure_monitor_mode("5G", self.packet_log_5g)
64        if hasattr(self, "android_devices"):
65            for ad in self.android_devices:
66                wutils.wifi_test_device_init(ad)
67                if hasattr(self, "country_code_file"):
68                    if isinstance(self.country_code_file, list):
69                        self.country_code_file = self.country_code_file[0]
70                    if not os.path.isfile(self.country_code_file):
71                        self.country_code_file = os.path.join(
72                            self.user_params[Config.key_config_path.value],
73                            self.country_code_file)
74                    self.country_code = utils.load_config(
75                        self.country_code_file)["country"]
76                else:
77                    self.country_code = WifiEnums.CountryCode.US
78                wutils.set_wifi_country_code(ad, self.country_code)
79
80    def setup_test(self):
81        if (hasattr(self, "android_devices")):
82            wutils.start_all_wlan_logs(self.android_devices)
83        self.tcpdump_proc = []
84        if hasattr(self, "android_devices"):
85            for ad in self.android_devices:
86                proc = nutils.start_tcpdump(ad, self.test_name)
87                self.tcpdump_proc.append((ad, proc))
88        if hasattr(self, "packet_logger"):
89            self.packet_log_pid = wutils.start_pcap(self.packet_logger, 'dual',
90                                                    self.test_name)
91
92    def teardown_test(self):
93        if (hasattr(self, "android_devices")):
94            wutils.stop_all_wlan_logs(self.android_devices)
95            for proc in self.tcpdump_proc:
96                nutils.stop_tcpdump(proc[0],
97                                    proc[1],
98                                    self.test_name,
99                                    pull_dump=False)
100            self.tcpdump_proc = []
101        if hasattr(self, "packet_logger") and self.packet_log_pid:
102            wutils.stop_pcap(self.packet_logger,
103                             self.packet_log_pid,
104                             test_status=True)
105            self.packet_log_pid = {}
106
107    def teardown_class(self):
108        begin_time = utils.get_current_epoch_time()
109        super().teardown_class()
110        for device in getattr(self, "fuchsia_devices", []):
111            device.take_bug_report(STAGE_NAME_TEARDOWN_CLASS, begin_time)
112
113    def on_fail(self, test_name, begin_time):
114        if hasattr(self, "android_devices"):
115            for ad in self.android_devices:
116                ad.take_bug_report(test_name, begin_time)
117                ad.cat_adb_log(test_name, begin_time)
118                wutils.get_ssrdumps(ad)
119            wutils.stop_all_wlan_logs(self.android_devices)
120            for ad in self.android_devices:
121                wutils.get_wlan_logs(ad)
122            for proc in self.tcpdump_proc:
123                nutils.stop_tcpdump(proc[0], proc[1], self.test_name)
124            self.tcpdump_proc = []
125        if hasattr(self, "packet_logger") and self.packet_log_pid:
126            wutils.stop_pcap(self.packet_logger,
127                             self.packet_log_pid,
128                             test_status=False)
129            self.packet_log_pid = {}
130
131        # Gets a wlan_device log and calls the generic device fail on DUT.
132        for device in getattr(self, "fuchsia_devices", []):
133            self.on_device_fail(device, test_name, begin_time)
134
135    def on_device_fail(self, device, test_name, begin_time):
136        """Gets a generic device DUT bug report.
137
138        This method takes a bug report if the device has the
139        'take_bug_report_on_fail' config value, and if the flag is true. This
140        method also power cycles if 'hard_reboot_on_fail' is True.
141
142        Args:
143            device: Generic device to gather logs from.
144            test_name: Name of the test that triggered this function.
145            begin_time: Logline format timestamp taken when the test started.
146        """
147        if (not hasattr(device, "take_bug_report_on_fail")
148                or device.take_bug_report_on_fail):
149            device.take_bug_report(test_name, begin_time)
150
151        if hasattr(device,
152                   "hard_reboot_on_fail") and device.hard_reboot_on_fail:
153            device.reboot(reboot_type='hard', testbed_pdus=self.pdu_devices)
154
155    def download_ap_logs(self):
156        """Downloads the DHCP and hostapad logs from the access_point.
157
158        Using the current TestClassContext and TestCaseContext this method pulls
159        the DHCP and hostapd logs and outputs them to the correct path.
160        """
161        current_path = context.get_current_context().get_full_output_path()
162        dhcp_full_out_path = os.path.join(current_path, "dhcp_log.txt")
163
164        dhcp_log = self.access_point.get_dhcp_logs()
165        if dhcp_log:
166            dhcp_log_file = open(dhcp_full_out_path, 'w')
167            dhcp_log_file.write(dhcp_log)
168            dhcp_log_file.close()
169
170        hostapd_logs = self.access_point.get_hostapd_logs()
171        for interface in hostapd_logs:
172            out_name = interface + "_hostapd_log.txt"
173            hostapd_full_out_path = os.path.join(current_path, out_name)
174            hostapd_log_file = open(hostapd_full_out_path, 'w')
175            hostapd_log_file.write(hostapd_logs[interface])
176            hostapd_log_file.close()
177
178    def get_psk_network(
179            self,
180            mirror_ap,
181            reference_networks,
182            hidden=False,
183            same_ssid=False,
184            security_mode=hostapd_constants.WPA2_STRING,
185            ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
186            ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
187            passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G,
188            passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G):
189        """Generates SSID and passphrase for a WPA2 network using random
190           generator.
191
192           Args:
193               mirror_ap: Boolean, determines if both APs use the same hostapd
194                          config or different configs.
195               reference_networks: List of PSK networks.
196               same_ssid: Boolean, determines if both bands on AP use the same
197                          SSID.
198               ssid_length_2gecond AP Int, number of characters to use for 2G SSID.
199               ssid_length_5g: Int, number of characters to use for 5G SSID.
200               passphrase_length_2g: Int, length of password for 2G network.
201               passphrase_length_5g: Int, length of password for 5G network.
202
203           Returns: A dict of 2G and 5G network lists for hostapd configuration.
204
205        """
206        network_dict_2g = {}
207        network_dict_5g = {}
208        ref_5g_security = security_mode
209        ref_2g_security = security_mode
210
211        if same_ssid:
212            ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g)
213            ref_5g_ssid = ref_2g_ssid
214
215            ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g)
216            ref_5g_passphrase = ref_2g_passphrase
217
218        else:
219            ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g)
220            ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g)
221
222            ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g)
223            ref_5g_passphrase = utils.rand_ascii_str(passphrase_length_5g)
224
225        network_dict_2g = {
226            "SSID": ref_2g_ssid,
227            "security": ref_2g_security,
228            "password": ref_2g_passphrase,
229            "hiddenSSID": hidden
230        }
231
232        network_dict_5g = {
233            "SSID": ref_5g_ssid,
234            "security": ref_5g_security,
235            "password": ref_5g_passphrase,
236            "hiddenSSID": hidden
237        }
238
239        ap = 0
240        for ap in range(MAX_AP_COUNT):
241            reference_networks.append({
242                "2g": copy.copy(network_dict_2g),
243                "5g": copy.copy(network_dict_5g)
244            })
245            if not mirror_ap:
246                break
247        return {"2g": network_dict_2g, "5g": network_dict_5g}
248
249    def get_open_network(self,
250                         mirror_ap,
251                         open_network,
252                         hidden=False,
253                         same_ssid=False,
254                         ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
255                         ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
256                         security_mode='none'):
257        """Generates SSIDs for a open network using a random generator.
258
259        Args:
260            mirror_ap: Boolean, determines if both APs use the same hostapd
261                       config or different configs.
262            open_network: List of open networks.
263            same_ssid: Boolean, determines if both bands on AP use the same
264                       SSID.
265            ssid_length_2g: Int, number of characters to use for 2G SSID.
266            ssid_length_5g: Int, number of characters to use for 5G SSID.
267            security_mode: 'none' for open and 'OWE' for WPA3 OWE.
268
269        Returns: A dict of 2G and 5G network lists for hostapd configuration.
270
271        """
272        network_dict_2g = {}
273        network_dict_5g = {}
274
275        if same_ssid:
276            open_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g)
277            open_5g_ssid = open_2g_ssid
278
279        else:
280            open_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g)
281            open_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g)
282
283        network_dict_2g = {
284            "SSID": open_2g_ssid,
285            "security": security_mode,
286            "hiddenSSID": hidden
287        }
288
289        network_dict_5g = {
290            "SSID": open_5g_ssid,
291            "security": security_mode,
292            "hiddenSSID": hidden
293        }
294
295        ap = 0
296        for ap in range(MAX_AP_COUNT):
297            open_network.append({
298                "2g": copy.copy(network_dict_2g),
299                "5g": copy.copy(network_dict_5g)
300            })
301            if not mirror_ap:
302                break
303        return {"2g": network_dict_2g, "5g": network_dict_5g}
304
305    def get_wep_network(
306            self,
307            mirror_ap,
308            networks,
309            hidden=False,
310            same_ssid=False,
311            ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
312            ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
313            passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G,
314            passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G):
315        """Generates SSID and passphrase for a WEP network using random
316           generator.
317
318           Args:
319               mirror_ap: Boolean, determines if both APs use the same hostapd
320                          config or different configs.
321               networks: List of WEP networks.
322               same_ssid: Boolean, determines if both bands on AP use the same
323                          SSID.
324               ssid_length_2gecond AP Int, number of characters to use for 2G SSID.
325               ssid_length_5g: Int, number of characters to use for 5G SSID.
326               passphrase_length_2g: Int, length of password for 2G network.
327               passphrase_length_5g: Int, length of password for 5G network.
328
329           Returns: A dict of 2G and 5G network lists for hostapd configuration.
330
331        """
332        network_dict_2g = {}
333        network_dict_5g = {}
334        ref_5g_security = hostapd_constants.WEP_STRING
335        ref_2g_security = hostapd_constants.WEP_STRING
336
337        if same_ssid:
338            ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g)
339            ref_5g_ssid = ref_2g_ssid
340
341            ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g)
342            ref_5g_passphrase = ref_2g_passphrase
343
344        else:
345            ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g)
346            ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g)
347
348            ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g)
349            ref_5g_passphrase = utils.rand_hex_str(passphrase_length_5g)
350
351        network_dict_2g = {
352            "SSID": ref_2g_ssid,
353            "security": ref_2g_security,
354            "wepKeys": [ref_2g_passphrase] * 4,
355            "hiddenSSID": hidden
356        }
357
358        network_dict_5g = {
359            "SSID": ref_5g_ssid,
360            "security": ref_5g_security,
361            "wepKeys": [ref_2g_passphrase] * 4,
362            "hiddenSSID": hidden
363        }
364
365        ap = 0
366        for ap in range(MAX_AP_COUNT):
367            networks.append({
368                "2g": copy.copy(network_dict_2g),
369                "5g": copy.copy(network_dict_5g)
370            })
371            if not mirror_ap:
372                break
373        return {"2g": network_dict_2g, "5g": network_dict_5g}
374
375    def update_bssid(self, ap_instance, ap, network, band):
376        """Get bssid and update network dictionary.
377
378        Args:
379            ap_instance: Accesspoint index that was configured.
380            ap: Accesspoint object corresponding to ap_instance.
381            network: Network dictionary.
382            band: Wifi networks' band.
383
384        """
385        bssid = ap.get_bssid_from_ssid(network["SSID"], band)
386
387        if network["security"] == hostapd_constants.WPA2_STRING:
388            # TODO:(bamahadev) Change all occurances of reference_networks
389            # to wpa_networks.
390            self.reference_networks[ap_instance][band]["bssid"] = bssid
391        if network["security"] == hostapd_constants.WPA_STRING:
392            self.wpa_networks[ap_instance][band]["bssid"] = bssid
393        if network["security"] == hostapd_constants.WEP_STRING:
394            self.wep_networks[ap_instance][band]["bssid"] = bssid
395        if network["security"] == hostapd_constants.ENT_STRING:
396            if "bssid" not in self.ent_networks[ap_instance][band]:
397                self.ent_networks[ap_instance][band]["bssid"] = bssid
398            else:
399                self.ent_networks_pwd[ap_instance][band]["bssid"] = bssid
400        if network["security"] == 'none':
401            self.open_network[ap_instance][band]["bssid"] = bssid
402
403    def populate_bssid(self, ap_instance, ap, networks_5g, networks_2g):
404        """Get bssid for a given SSID and add it to the network dictionary.
405
406        Args:
407            ap_instance: Accesspoint index that was configured.
408            ap: Accesspoint object corresponding to ap_instance.
409            networks_5g: List of 5g networks configured on the APs.
410            networks_2g: List of 2g networks configured on the APs.
411
412        """
413
414        if not (networks_5g or networks_2g):
415            return
416
417        for network in networks_5g:
418            if 'channel' in network:
419                continue
420            self.update_bssid(ap_instance, ap, network,
421                              hostapd_constants.BAND_5G)
422
423        for network in networks_2g:
424            if 'channel' in network:
425                continue
426            self.update_bssid(ap_instance, ap, network,
427                              hostapd_constants.BAND_2G)
428
429    def configure_openwrt_ap_and_start(
430            self,
431            channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
432            channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
433            channel_5g_ap2=None,
434            channel_2g_ap2=None,
435            ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
436            passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G,
437            ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
438            passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G,
439            mirror_ap=False,
440            hidden=False,
441            same_ssid=False,
442            open_network=False,
443            wpa1_network=False,
444            wpa_network=False,
445            wep_network=False,
446            ent_network=False,
447            ent_network_pwd=False,
448            owe_network=False,
449            sae_network=False,
450            saemixed_network=False,
451            radius_conf_2g=None,
452            radius_conf_5g=None,
453            radius_conf_pwd=None,
454            ap_count=1,
455            ieee80211w=None):
456        """Create, configure and start OpenWrt AP.
457
458        Args:
459            channel_5g: 5G channel to configure.
460            channel_2g: 2G channel to configure.
461            channel_5g_ap2: 5G channel to configure on AP2.
462            channel_2g_ap2: 2G channel to configure on AP2.
463            ssid_length_2g: Int, number of characters to use for 2G SSID.
464            passphrase_length_2g: Int, length of password for 2G network.
465            ssid_length_5g: Int, number of characters to use for 5G SSID.
466            passphrase_length_5g: Int, length of password for 5G network.
467            same_ssid: Boolean, determines if both bands on AP use the same SSID.
468            open_network: Boolean, to check if open network should be configured.
469            wpa_network: Boolean, to check if wpa network should be configured.
470            wep_network: Boolean, to check if wep network should be configured.
471            ent_network: Boolean, to check if ent network should be configured.
472            ent_network_pwd: Boolean, to check if ent pwd network should be configured.
473            owe_network: Boolean, to check if owe network should be configured.
474            sae_network: Boolean, to check if sae network should be configured.
475            saemixed_network: Boolean, to check if saemixed network should be configured.
476            radius_conf_2g: dictionary with enterprise radius server details.
477            radius_conf_5g: dictionary with enterprise radius server details.
478            radius_conf_pwd: dictionary with enterprise radiuse server details.
479            ap_count: APs to configure.
480            ieee80211w:PMF to configure
481        """
482        if mirror_ap and ap_count == 1:
483            raise ValueError("ap_count cannot be 1 if mirror_ap is True.")
484        if (channel_5g_ap2 or channel_2g_ap2) and ap_count == 1:
485            raise ValueError(
486                "ap_count cannot be 1 if channels of AP2 are provided.")
487        # we are creating a channel list for 2G and 5G bands. The list is of
488        # size 2 and this is based on the assumption that each testbed will have
489        # at most 2 APs.
490        if not channel_5g_ap2:
491            channel_5g_ap2 = channel_5g
492        if not channel_2g_ap2:
493            channel_2g_ap2 = channel_2g
494        channels_2g = [channel_2g, channel_2g_ap2]
495        channels_5g = [channel_5g, channel_5g_ap2]
496
497        self.reference_networks = []
498        self.wpa1_networks = []
499        self.wpa_networks = []
500        self.wep_networks = []
501        self.ent_networks = []
502        self.ent_networks_pwd = []
503        self.open_network = []
504        self.owe_networks = []
505        self.sae_networks = []
506        self.saemixed_networks = []
507        self.bssid_map = []
508        for i in range(ap_count):
509            network_list = []
510            if wpa1_network:
511                wpa1_dict = self.get_psk_network(mirror_ap, self.wpa1_networks,
512                                                 hidden, same_ssid,
513                                                 ssid_length_2g,
514                                                 ssid_length_5g,
515                                                 passphrase_length_2g,
516                                                 passphrase_length_5g)
517                wpa1_dict[hostapd_constants.BAND_2G]["security"] = "psk"
518                wpa1_dict[hostapd_constants.BAND_5G]["security"] = "psk"
519                wpa1_dict[hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w
520                wpa1_dict[hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w
521                self.wpa1_networks.append(wpa1_dict)
522                network_list.append(wpa1_dict)
523            if wpa_network:
524                wpa_dict = self.get_psk_network(mirror_ap,
525                                                self.reference_networks,
526                                                hidden, same_ssid,
527                                                ssid_length_2g, ssid_length_5g,
528                                                passphrase_length_2g,
529                                                passphrase_length_5g)
530                wpa_dict[hostapd_constants.BAND_2G]["security"] = "psk2"
531                wpa_dict[hostapd_constants.BAND_5G]["security"] = "psk2"
532                wpa_dict[hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w
533                wpa_dict[hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w
534                self.wpa_networks.append(wpa_dict)
535                network_list.append(wpa_dict)
536            if wep_network:
537                wep_dict = self.get_wep_network(mirror_ap, self.wep_networks,
538                                                hidden, same_ssid,
539                                                ssid_length_2g, ssid_length_5g)
540                network_list.append(wep_dict)
541            if ent_network:
542                ent_dict = self.get_open_network(mirror_ap, self.ent_networks,
543                                                 hidden, same_ssid,
544                                                 ssid_length_2g,
545                                                 ssid_length_5g)
546                ent_dict["2g"]["security"] = "wpa2"
547                ent_dict["2g"].update(radius_conf_2g)
548                ent_dict["5g"]["security"] = "wpa2"
549                ent_dict["5g"].update(radius_conf_5g)
550                network_list.append(ent_dict)
551            if ent_network_pwd:
552                ent_pwd_dict = self.get_open_network(mirror_ap,
553                                                     self.ent_networks_pwd,
554                                                     hidden, same_ssid,
555                                                     ssid_length_2g,
556                                                     ssid_length_5g)
557                ent_pwd_dict["2g"]["security"] = "wpa2"
558                ent_pwd_dict["2g"].update(radius_conf_pwd)
559                ent_pwd_dict["5g"]["security"] = "wpa2"
560                ent_pwd_dict["5g"].update(radius_conf_pwd)
561                network_list.append(ent_pwd_dict)
562            if open_network:
563                open_dict = self.get_open_network(mirror_ap, self.open_network,
564                                                  hidden, same_ssid,
565                                                  ssid_length_2g,
566                                                  ssid_length_5g)
567                network_list.append(open_dict)
568            if owe_network:
569                owe_dict = self.get_open_network(mirror_ap, self.owe_networks,
570                                                 hidden, same_ssid,
571                                                 ssid_length_2g,
572                                                 ssid_length_5g, "OWE")
573                owe_dict[hostapd_constants.BAND_2G]["security"] = "owe"
574                owe_dict[hostapd_constants.BAND_5G]["security"] = "owe"
575                network_list.append(owe_dict)
576            if sae_network:
577                sae_dict = self.get_psk_network(mirror_ap, self.sae_networks,
578                                                hidden, same_ssid,
579                                                hostapd_constants.SAE_KEY_MGMT,
580                                                ssid_length_2g, ssid_length_5g,
581                                                passphrase_length_2g,
582                                                passphrase_length_5g)
583                sae_dict[hostapd_constants.BAND_2G]["security"] = "sae"
584                sae_dict[hostapd_constants.BAND_5G]["security"] = "sae"
585                network_list.append(sae_dict)
586            if saemixed_network:
587                saemixed_dict = self.get_psk_network(
588                    mirror_ap, self.saemixed_networks, hidden, same_ssid,
589                    hostapd_constants.SAE_KEY_MGMT, ssid_length_2g,
590                    ssid_length_5g, passphrase_length_2g, passphrase_length_5g)
591                saemixed_dict[
592                    hostapd_constants.BAND_2G]["security"] = "sae-mixed"
593                saemixed_dict[
594                    hostapd_constants.BAND_5G]["security"] = "sae-mixed"
595                saemixed_dict[
596                    hostapd_constants.BAND_2G]["ieee80211w"] = ieee80211w
597                saemixed_dict[
598                    hostapd_constants.BAND_5G]["ieee80211w"] = ieee80211w
599                network_list.append(saemixed_dict)
600            self.access_points[i].configure_ap(network_list, channels_2g[i],
601                                               channels_5g[i])
602            self.access_points[i].start_ap()
603            self.bssid_map.append(
604                self.access_points[i].get_bssids_for_wifi_networks())
605            if mirror_ap:
606                self.access_points[i + 1].configure_ap(network_list,
607                                                       channels_2g[i + 1],
608                                                       channels_5g[i + 1])
609                self.access_points[i + 1].start_ap()
610                self.bssid_map.append(
611                    self.access_points[i + 1].get_bssids_for_wifi_networks())
612                break
613
614    def legacy_configure_ap_and_start(
615            self,
616            channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
617            channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
618            max_2g_networks=hostapd_constants.AP_DEFAULT_MAX_SSIDS_2G,
619            max_5g_networks=hostapd_constants.AP_DEFAULT_MAX_SSIDS_5G,
620            ap_ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
621            ap_passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G,
622            ap_ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
623            ap_passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G,
624            hidden=False,
625            same_ssid=False,
626            mirror_ap=True,
627            wpa_network=False,
628            wep_network=False,
629            ent_network=False,
630            radius_conf_2g=None,
631            radius_conf_5g=None,
632            ent_network_pwd=False,
633            radius_conf_pwd=None,
634            ap_count=1):
635
636        config_count = 1
637        count = 0
638
639        # For example, the NetworkSelector tests use 2 APs and require that
640        # both APs are not mirrored.
641        if not mirror_ap and ap_count == 1:
642            raise ValueError("ap_count cannot be 1 if mirror_ap is False.")
643
644        if not mirror_ap:
645            config_count = ap_count
646
647        self.user_params["reference_networks"] = []
648        self.user_params["open_network"] = []
649        if wpa_network:
650            self.user_params["wpa_networks"] = []
651        if wep_network:
652            self.user_params["wep_networks"] = []
653        if ent_network:
654            self.user_params["ent_networks"] = []
655        if ent_network_pwd:
656            self.user_params["ent_networks_pwd"] = []
657
658        # kill hostapd & dhcpd if the cleanup was not successful
659        for i in range(len(self.access_points)):
660            self.log.debug("Check ap state and cleanup")
661            self._cleanup_hostapd_and_dhcpd(i)
662
663        for count in range(config_count):
664
665            network_list_2g = []
666            network_list_5g = []
667
668            orig_network_list_2g = []
669            orig_network_list_5g = []
670
671            network_list_2g.append({"channel": channel_2g})
672            network_list_5g.append({"channel": channel_5g})
673
674            networks_dict = self.get_psk_network(
675                mirror_ap,
676                self.user_params["reference_networks"],
677                hidden=hidden,
678                same_ssid=same_ssid)
679            self.reference_networks = self.user_params["reference_networks"]
680
681            network_list_2g.append(networks_dict["2g"])
682            network_list_5g.append(networks_dict["5g"])
683
684            # When same_ssid is set, only configure one set of WPA networks.
685            # We cannot have more than one set because duplicate interface names
686            # are not allowed.
687            # TODO(bmahadev): Provide option to select the type of network,
688            # instead of defaulting to WPA.
689            if not same_ssid:
690                networks_dict = self.get_open_network(
691                    mirror_ap,
692                    self.user_params["open_network"],
693                    hidden=hidden,
694                    same_ssid=same_ssid)
695                self.open_network = self.user_params["open_network"]
696
697                network_list_2g.append(networks_dict["2g"])
698                network_list_5g.append(networks_dict["5g"])
699
700                if wpa_network:
701                    networks_dict = self.get_psk_network(
702                        mirror_ap,
703                        self.user_params["wpa_networks"],
704                        hidden=hidden,
705                        same_ssid=same_ssid,
706                        security_mode=hostapd_constants.WPA_STRING)
707                    self.wpa_networks = self.user_params["wpa_networks"]
708
709                    network_list_2g.append(networks_dict["2g"])
710                    network_list_5g.append(networks_dict["5g"])
711
712                if wep_network:
713                    networks_dict = self.get_wep_network(
714                        mirror_ap,
715                        self.user_params["wep_networks"],
716                        hidden=hidden,
717                        same_ssid=same_ssid)
718                    self.wep_networks = self.user_params["wep_networks"]
719
720                    network_list_2g.append(networks_dict["2g"])
721                    network_list_5g.append(networks_dict["5g"])
722
723                if ent_network:
724                    networks_dict = self.get_open_network(
725                        mirror_ap,
726                        self.user_params["ent_networks"],
727                        hidden=hidden,
728                        same_ssid=same_ssid)
729                    networks_dict["2g"][
730                        "security"] = hostapd_constants.ENT_STRING
731                    networks_dict["2g"].update(radius_conf_2g)
732                    networks_dict["5g"][
733                        "security"] = hostapd_constants.ENT_STRING
734                    networks_dict["5g"].update(radius_conf_5g)
735                    self.ent_networks = self.user_params["ent_networks"]
736
737                    network_list_2g.append(networks_dict["2g"])
738                    network_list_5g.append(networks_dict["5g"])
739
740                if ent_network_pwd:
741                    networks_dict = self.get_open_network(
742                        mirror_ap,
743                        self.user_params["ent_networks_pwd"],
744                        hidden=hidden,
745                        same_ssid=same_ssid)
746                    networks_dict["2g"][
747                        "security"] = hostapd_constants.ENT_STRING
748                    networks_dict["2g"].update(radius_conf_pwd)
749                    networks_dict["5g"][
750                        "security"] = hostapd_constants.ENT_STRING
751                    networks_dict["5g"].update(radius_conf_pwd)
752                    self.ent_networks_pwd = self.user_params[
753                        "ent_networks_pwd"]
754
755                    network_list_2g.append(networks_dict["2g"])
756                    network_list_5g.append(networks_dict["5g"])
757
758            orig_network_list_5g = copy.copy(network_list_5g)
759            orig_network_list_2g = copy.copy(network_list_2g)
760
761            if len(network_list_5g) > 1:
762                self.config_5g = self._generate_legacy_ap_config(
763                    network_list_5g)
764            if len(network_list_2g) > 1:
765                self.config_2g = self._generate_legacy_ap_config(
766                    network_list_2g)
767
768            self.access_points[count].start_ap(self.config_2g)
769            self.access_points[count].start_ap(self.config_5g)
770            self.populate_bssid(count, self.access_points[count],
771                                orig_network_list_5g, orig_network_list_2g)
772
773        # Repeat configuration on the second router.
774        if mirror_ap and ap_count == 2:
775            self.access_points[AP_2].start_ap(self.config_2g)
776            self.access_points[AP_2].start_ap(self.config_5g)
777            self.populate_bssid(AP_2, self.access_points[AP_2],
778                                orig_network_list_5g, orig_network_list_2g)
779
780    def _kill_processes(self, ap, daemon):
781        """ Kill hostapd and dhcpd daemons
782
783        Args:
784            ap: AP to cleanup
785            daemon: process to kill
786
787        Returns: True/False if killing process is successful
788        """
789        self.log.info("Killing %s" % daemon)
790        pids = ap.ssh.run('pidof %s' % daemon, ignore_status=True)
791        if pids.stdout:
792            ap.ssh.run('kill %s' % pids.stdout, ignore_status=True)
793        time.sleep(3)
794        pids = ap.ssh.run('pidof %s' % daemon, ignore_status=True)
795        if pids.stdout:
796            return False
797        return True
798
799    def _cleanup_hostapd_and_dhcpd(self, count):
800        """ Check if AP was cleaned up properly
801
802        Kill hostapd and dhcpd processes if cleanup was not successful in the
803        last run
804
805        Args:
806            count: AP to check
807
808        Returns:
809            New AccessPoint object if AP required cleanup
810
811        Raises:
812            Error: if the AccessPoint timed out to setup
813        """
814        ap = self.access_points[count]
815        phy_ifaces = ap.interfaces.get_physical_interface()
816        kill_hostapd = False
817        for iface in phy_ifaces:
818            if '2g_' in iface or '5g_' in iface or 'xg_' in iface:
819                kill_hostapd = True
820                break
821
822        if not kill_hostapd:
823            return
824
825        self.log.debug("Cleanup AP")
826        if not self._kill_processes(ap, 'hostapd') or \
827            not self._kill_processes(ap, 'dhcpd'):
828            raise ("Failed to cleanup AP")
829
830        ap.__init__(self.user_params['AccessPoint'][count])
831
832    def _generate_legacy_ap_config(self, network_list):
833        bss_settings = []
834        wlan_2g = self.access_points[AP_1].wlan_2g
835        wlan_5g = self.access_points[AP_1].wlan_5g
836        ap_settings = network_list.pop(0)
837        # TODO:(bmahadev) This is a bug. We should not have to pop the first
838        # network in the list and treat it as a separate case. Instead,
839        # create_ap_preset() should be able to take NULL ssid and security and
840        # build config based on the bss_Settings alone.
841        hostapd_config_settings = network_list.pop(0)
842        for network in network_list:
843            if "password" in network:
844                bss_settings.append(
845                    hostapd_bss_settings.BssSettings(
846                        name=network["SSID"],
847                        ssid=network["SSID"],
848                        hidden=network["hiddenSSID"],
849                        security=hostapd_security.Security(
850                            security_mode=network["security"],
851                            password=network["password"])))
852            elif "wepKeys" in network:
853                bss_settings.append(
854                    hostapd_bss_settings.BssSettings(
855                        name=network["SSID"],
856                        ssid=network["SSID"],
857                        hidden=network["hiddenSSID"],
858                        security=hostapd_security.Security(
859                            security_mode=network["security"],
860                            password=network["wepKeys"][0])))
861            elif network["security"] == hostapd_constants.ENT_STRING:
862                bss_settings.append(
863                    hostapd_bss_settings.BssSettings(
864                        name=network["SSID"],
865                        ssid=network["SSID"],
866                        hidden=network["hiddenSSID"],
867                        security=hostapd_security.Security(
868                            security_mode=network["security"],
869                            radius_server_ip=network["radius_server_ip"],
870                            radius_server_port=network["radius_server_port"],
871                            radius_server_secret=network[
872                                "radius_server_secret"])))
873            else:
874                bss_settings.append(
875                    hostapd_bss_settings.BssSettings(
876                        name=network["SSID"],
877                        ssid=network["SSID"],
878                        hidden=network["hiddenSSID"]))
879        if "password" in hostapd_config_settings:
880            config = hostapd_ap_preset.create_ap_preset(
881                iface_wlan_2g=wlan_2g,
882                iface_wlan_5g=wlan_5g,
883                channel=ap_settings["channel"],
884                ssid=hostapd_config_settings["SSID"],
885                hidden=hostapd_config_settings["hiddenSSID"],
886                security=hostapd_security.Security(
887                    security_mode=hostapd_config_settings["security"],
888                    password=hostapd_config_settings["password"]),
889                bss_settings=bss_settings)
890        elif "wepKeys" in hostapd_config_settings:
891            config = hostapd_ap_preset.create_ap_preset(
892                iface_wlan_2g=wlan_2g,
893                iface_wlan_5g=wlan_5g,
894                channel=ap_settings["channel"],
895                ssid=hostapd_config_settings["SSID"],
896                hidden=hostapd_config_settings["hiddenSSID"],
897                security=hostapd_security.Security(
898                    security_mode=hostapd_config_settings["security"],
899                    password=hostapd_config_settings["wepKeys"][0]),
900                bss_settings=bss_settings)
901        else:
902            config = hostapd_ap_preset.create_ap_preset(
903                iface_wlan_2g=wlan_2g,
904                iface_wlan_5g=wlan_5g,
905                channel=ap_settings["channel"],
906                ssid=hostapd_config_settings["SSID"],
907                hidden=hostapd_config_settings["hiddenSSID"],
908                bss_settings=bss_settings)
909        return config
910
911    def configure_packet_capture(
912            self,
913            channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
914            channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G):
915        """Configure packet capture for 2G and 5G bands.
916
917        Args:
918            channel_5g: Channel to set the monitor mode to for 5G band.
919            channel_2g: Channel to set the monitor mode to for 2G band.
920        """
921        self.packet_capture = self.packet_capture[0]
922        result = self.packet_capture.configure_monitor_mode(
923            hostapd_constants.BAND_2G, channel_2g)
924        if not result:
925            raise ValueError("Failed to configure channel for 2G band")
926
927        result = self.packet_capture.configure_monitor_mode(
928            hostapd_constants.BAND_5G, channel_5g)
929        if not result:
930            raise ValueError("Failed to configure channel for 5G band.")
931
932    @staticmethod
933    def wifi_test_wrap(fn):
934        def _safe_wrap_test_case(self, *args, **kwargs):
935            test_id = "%s:%s:%s" % (self.__class__.__name__, self.test_name,
936                                    self.log_begin_time.replace(' ', '-'))
937            self.test_id = test_id
938            self.result_detail = ""
939            tries = int(self.user_params.get("wifi_auto_rerun", 3))
940            for ad in self.android_devices:
941                ad.log_path = self.log_path
942            for i in range(tries + 1):
943                result = True
944                if i > 0:
945                    log_string = "[Test Case] RETRY:%s %s" % (i,
946                                                              self.test_name)
947                    self.log.info(log_string)
948                    self._teardown_test(self.test_name)
949                    self._setup_test(self.test_name)
950                try:
951                    result = fn(self, *args, **kwargs)
952                except signals.TestFailure as e:
953                    self.log.warn("Error msg: %s" % e)
954                    if self.result_detail:
955                        signal.details = self.result_detail
956                    result = False
957                except signals.TestSignal:
958                    if self.result_detail:
959                        signal.details = self.result_detail
960                    raise
961                except Exception as e:
962                    self.log.exception(e)
963                    asserts.fail(self.result_detail)
964                if result is False:
965                    if i < tries:
966                        continue
967                else:
968                    break
969            if result is not False:
970                asserts.explicit_pass(self.result_detail)
971            else:
972                asserts.fail(self.result_detail)
973
974        return _safe_wrap_test_case
975