• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2017 - Google
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16"""
17    Base Class for Defining Common WiFi Test Functionality
18"""
19
20import copy
21import itertools
22import os
23import time
24
25import acts.controllers.access_point as ap
26
27from acts import asserts
28from acts import signals
29from acts import utils
30from acts.base_test import BaseTestClass
31from acts.signals import TestSignal
32from acts.controllers import android_device
33from acts.controllers.access_point import AccessPoint
34from acts.controllers.ap_lib import hostapd_ap_preset
35from acts.controllers.ap_lib import hostapd_bss_settings
36from acts.controllers.ap_lib import hostapd_constants
37from acts.controllers.ap_lib import hostapd_security
38from acts.keys import Config
39from acts_contrib.test_utils.net import net_test_utils as nutils
40from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
41
42AP_1 = 0
43AP_2 = 1
44MAX_AP_COUNT = 2
45
46
47class WifiBaseTest(BaseTestClass):
48    def __init__(self, configs):
49        super().__init__(configs)
50        self.enable_packet_log = False
51        self.packet_log_2g = hostapd_constants.AP_DEFAULT_CHANNEL_2G
52        self.packet_log_5g = hostapd_constants.AP_DEFAULT_CHANNEL_5G
53
54    def setup_class(self):
55        if hasattr(self, 'attenuators') and self.attenuators:
56            for attenuator in self.attenuators:
57                attenuator.set_atten(0)
58        opt_param = ["pixel_models", "cnss_diag_file", "country_code_file"]
59        self.unpack_userparams(opt_param_names=opt_param)
60        if hasattr(self, "cnss_diag_file"):
61            if isinstance(self.cnss_diag_file, list):
62                self.cnss_diag_file = self.cnss_diag_file[0]
63            if not os.path.isfile(self.cnss_diag_file):
64                self.cnss_diag_file = os.path.join(
65                    self.user_params[Config.key_config_path.value],
66                    self.cnss_diag_file)
67        if self.enable_packet_log and hasattr(self, "packet_capture"):
68            self.packet_logger = self.packet_capture[0]
69            self.packet_logger.configure_monitor_mode("2G", self.packet_log_2g)
70            self.packet_logger.configure_monitor_mode("5G", self.packet_log_5g)
71        if hasattr(self, "android_devices"):
72            for ad in self.android_devices:
73                wutils.wifi_test_device_init(ad)
74                if hasattr(self, "country_code_file"):
75                    if isinstance(self.country_code_file, list):
76                        self.country_code_file = self.country_code_file[0]
77                    if not os.path.isfile(self.country_code_file):
78                        self.country_code_file = os.path.join(
79                            self.user_params[Config.key_config_path.value],
80                            self.country_code_file)
81                    self.country_code = utils.load_config(
82                        self.country_code_file)["country"]
83                    wutils.set_wifi_country_code(ad, self.country_code)
84
85    def setup_test(self):
86        if (hasattr(self, "android_devices")
87                and hasattr(self, "cnss_diag_file")
88                and hasattr(self, "pixel_models")):
89            wutils.start_cnss_diags(self.android_devices, self.cnss_diag_file,
90                                    self.pixel_models)
91        self.tcpdump_proc = []
92        if hasattr(self, "android_devices"):
93            for ad in self.android_devices:
94                proc = nutils.start_tcpdump(ad, self.test_name)
95                self.tcpdump_proc.append((ad, proc))
96        if hasattr(self, "packet_logger"):
97            self.packet_log_pid = wutils.start_pcap(self.packet_logger, 'dual',
98                                                    self.test_name)
99
100    def teardown_test(self):
101        if (hasattr(self, "android_devices")
102                and hasattr(self, "cnss_diag_file")
103                and hasattr(self, "pixel_models")):
104            wutils.stop_cnss_diags(self.android_devices, self.pixel_models)
105            for proc in self.tcpdump_proc:
106                nutils.stop_tcpdump(proc[0],
107                                    proc[1],
108                                    self.test_name,
109                                    pull_dump=False)
110            self.tcpdump_proc = []
111        if hasattr(self, "packet_logger") and self.packet_log_pid:
112            wutils.stop_pcap(self.packet_logger,
113                             self.packet_log_pid,
114                             test_status=True)
115            self.packet_log_pid = {}
116
117    def on_fail(self, test_name, begin_time):
118        if hasattr(self, "android_devices"):
119            for ad in self.android_devices:
120                ad.take_bug_report(test_name, begin_time)
121                ad.cat_adb_log(test_name, begin_time)
122                wutils.get_ssrdumps(ad)
123            if (hasattr(self, "cnss_diag_file")
124                    and hasattr(self, "pixel_models")):
125                wutils.stop_cnss_diags(self.android_devices, self.pixel_models)
126                for ad in self.android_devices:
127                    wutils.get_cnss_diag_log(ad)
128            for proc in self.tcpdump_proc:
129                nutils.stop_tcpdump(proc[0], proc[1], self.test_name)
130            self.tcpdump_proc = []
131        if hasattr(self, "packet_logger") and self.packet_log_pid:
132            wutils.stop_pcap(self.packet_logger,
133                             self.packet_log_pid,
134                             test_status=False)
135            self.packet_log_pid = {}
136
137    def get_psk_network(
138            self,
139            mirror_ap,
140            reference_networks,
141            hidden=False,
142            same_ssid=False,
143            security_mode=hostapd_constants.WPA2_STRING,
144            ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
145            ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
146            passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G,
147            passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G):
148        """Generates SSID and passphrase for a WPA2 network using random
149           generator.
150
151           Args:
152               mirror_ap: Boolean, determines if both APs use the same hostapd
153                          config or different configs.
154               reference_networks: List of PSK networks.
155               same_ssid: Boolean, determines if both bands on AP use the same
156                          SSID.
157               ssid_length_2gecond AP Int, number of characters to use for 2G SSID.
158               ssid_length_5g: Int, number of characters to use for 5G SSID.
159               passphrase_length_2g: Int, length of password for 2G network.
160               passphrase_length_5g: Int, length of password for 5G network.
161
162           Returns: A dict of 2G and 5G network lists for hostapd configuration.
163
164        """
165        network_dict_2g = {}
166        network_dict_5g = {}
167        ref_5g_security = security_mode
168        ref_2g_security = security_mode
169
170        if same_ssid:
171            ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g)
172            ref_5g_ssid = ref_2g_ssid
173
174            ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g)
175            ref_5g_passphrase = ref_2g_passphrase
176
177        else:
178            ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g)
179            ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g)
180
181            ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g)
182            ref_5g_passphrase = utils.rand_ascii_str(passphrase_length_5g)
183
184        network_dict_2g = {
185            "SSID": ref_2g_ssid,
186            "security": ref_2g_security,
187            "password": ref_2g_passphrase,
188            "hiddenSSID": hidden
189        }
190
191        network_dict_5g = {
192            "SSID": ref_5g_ssid,
193            "security": ref_5g_security,
194            "password": ref_5g_passphrase,
195            "hiddenSSID": hidden
196        }
197
198        ap = 0
199        for ap in range(MAX_AP_COUNT):
200            reference_networks.append({
201                "2g": copy.copy(network_dict_2g),
202                "5g": copy.copy(network_dict_5g)
203            })
204            if not mirror_ap:
205                break
206        return {"2g": network_dict_2g, "5g": network_dict_5g}
207
208    def get_open_network(self,
209                         mirror_ap,
210                         open_network,
211                         hidden=False,
212                         same_ssid=False,
213                         ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
214                         ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
215                         security_mode='none'):
216        """Generates SSIDs for a open network using a random generator.
217
218        Args:
219            mirror_ap: Boolean, determines if both APs use the same hostapd
220                       config or different configs.
221            open_network: List of open networks.
222            same_ssid: Boolean, determines if both bands on AP use the same
223                       SSID.
224            ssid_length_2g: Int, number of characters to use for 2G SSID.
225            ssid_length_5g: Int, number of characters to use for 5G SSID.
226            security_mode: 'none' for open and 'OWE' for WPA3 OWE.
227
228        Returns: A dict of 2G and 5G network lists for hostapd configuration.
229
230        """
231        network_dict_2g = {}
232        network_dict_5g = {}
233
234        if same_ssid:
235            open_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g)
236            open_5g_ssid = open_2g_ssid
237
238        else:
239            open_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g)
240            open_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g)
241
242        network_dict_2g = {
243            "SSID": open_2g_ssid,
244            "security": security_mode,
245            "hiddenSSID": hidden
246        }
247
248        network_dict_5g = {
249            "SSID": open_5g_ssid,
250            "security": security_mode,
251            "hiddenSSID": hidden
252        }
253
254        ap = 0
255        for ap in range(MAX_AP_COUNT):
256            open_network.append({
257                "2g": copy.copy(network_dict_2g),
258                "5g": copy.copy(network_dict_5g)
259            })
260            if not mirror_ap:
261                break
262        return {"2g": network_dict_2g, "5g": network_dict_5g}
263
264    def get_wep_network(
265            self,
266            mirror_ap,
267            networks,
268            hidden=False,
269            same_ssid=False,
270            ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
271            ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
272            passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G,
273            passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G):
274        """Generates SSID and passphrase for a WEP network using random
275           generator.
276
277           Args:
278               mirror_ap: Boolean, determines if both APs use the same hostapd
279                          config or different configs.
280               networks: List of WEP networks.
281               same_ssid: Boolean, determines if both bands on AP use the same
282                          SSID.
283               ssid_length_2gecond AP Int, number of characters to use for 2G SSID.
284               ssid_length_5g: Int, number of characters to use for 5G SSID.
285               passphrase_length_2g: Int, length of password for 2G network.
286               passphrase_length_5g: Int, length of password for 5G network.
287
288           Returns: A dict of 2G and 5G network lists for hostapd configuration.
289
290        """
291        network_dict_2g = {}
292        network_dict_5g = {}
293        ref_5g_security = hostapd_constants.WEP_STRING
294        ref_2g_security = hostapd_constants.WEP_STRING
295
296        if same_ssid:
297            ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g)
298            ref_5g_ssid = ref_2g_ssid
299
300            ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g)
301            ref_5g_passphrase = ref_2g_passphrase
302
303        else:
304            ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g)
305            ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g)
306
307            ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g)
308            ref_5g_passphrase = utils.rand_hex_str(passphrase_length_5g)
309
310        network_dict_2g = {
311            "SSID": ref_2g_ssid,
312            "security": ref_2g_security,
313            "wepKeys": [ref_2g_passphrase] * 4,
314            "hiddenSSID": hidden
315        }
316
317        network_dict_5g = {
318            "SSID": ref_5g_ssid,
319            "security": ref_5g_security,
320            "wepKeys": [ref_2g_passphrase] * 4,
321            "hiddenSSID": hidden
322        }
323
324        ap = 0
325        for ap in range(MAX_AP_COUNT):
326            networks.append({
327                "2g": copy.copy(network_dict_2g),
328                "5g": copy.copy(network_dict_5g)
329            })
330            if not mirror_ap:
331                break
332        return {"2g": network_dict_2g, "5g": network_dict_5g}
333
334    def update_bssid(self, ap_instance, ap, network, band):
335        """Get bssid and update network dictionary.
336
337        Args:
338            ap_instance: Accesspoint index that was configured.
339            ap: Accesspoint object corresponding to ap_instance.
340            network: Network dictionary.
341            band: Wifi networks' band.
342
343        """
344        bssid = ap.get_bssid_from_ssid(network["SSID"], band)
345
346        if network["security"] == hostapd_constants.WPA2_STRING:
347            # TODO:(bamahadev) Change all occurances of reference_networks
348            # to wpa_networks.
349            self.reference_networks[ap_instance][band]["bssid"] = bssid
350        if network["security"] == hostapd_constants.WPA_STRING:
351            self.wpa_networks[ap_instance][band]["bssid"] = bssid
352        if network["security"] == hostapd_constants.WEP_STRING:
353            self.wep_networks[ap_instance][band]["bssid"] = bssid
354        if network["security"] == hostapd_constants.ENT_STRING:
355            if "bssid" not in self.ent_networks[ap_instance][band]:
356                self.ent_networks[ap_instance][band]["bssid"] = bssid
357            else:
358                self.ent_networks_pwd[ap_instance][band]["bssid"] = bssid
359        if network["security"] == 'none':
360            self.open_network[ap_instance][band]["bssid"] = bssid
361
362    def populate_bssid(self, ap_instance, ap, networks_5g, networks_2g):
363        """Get bssid for a given SSID and add it to the network dictionary.
364
365        Args:
366            ap_instance: Accesspoint index that was configured.
367            ap: Accesspoint object corresponding to ap_instance.
368            networks_5g: List of 5g networks configured on the APs.
369            networks_2g: List of 2g networks configured on the APs.
370
371        """
372
373        if not (networks_5g or networks_2g):
374            return
375
376        for network in networks_5g:
377            if 'channel' in network:
378                continue
379            self.update_bssid(ap_instance, ap, network,
380                              hostapd_constants.BAND_5G)
381
382        for network in networks_2g:
383            if 'channel' in network:
384                continue
385            self.update_bssid(ap_instance, ap, network,
386                              hostapd_constants.BAND_2G)
387
388    def configure_openwrt_ap_and_start(
389            self,
390            channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
391            channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
392            channel_5g_ap2=None,
393            channel_2g_ap2=None,
394            ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
395            passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G,
396            ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
397            passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G,
398            mirror_ap=False,
399            hidden=False,
400            same_ssid=False,
401            open_network=False,
402            wpa1_network=False,
403            wpa_network=False,
404            wep_network=False,
405            ent_network=False,
406            ent_network_pwd=False,
407            owe_network=False,
408            sae_network=False,
409            radius_conf_2g=None,
410            radius_conf_5g=None,
411            radius_conf_pwd=None,
412            ap_count=1):
413        """Create, configure and start OpenWrt AP.
414
415        Args:
416            channel_5g: 5G channel to configure.
417            channel_2g: 2G channel to configure.
418            channel_5g_ap2: 5G channel to configure on AP2.
419            channel_2g_ap2: 2G channel to configure on AP2.
420            ssid_length_2g: Int, number of characters to use for 2G SSID.
421            passphrase_length_2g: Int, length of password for 2G network.
422            ssid_length_5g: Int, number of characters to use for 5G SSID.
423            passphrase_length_5g: Int, length of password for 5G network.
424            same_ssid: Boolean, determines if both bands on AP use the same SSID.
425            open_network: Boolean, to check if open network should be configured.
426            wpa_network: Boolean, to check if wpa network should be configured.
427            wep_network: Boolean, to check if wep network should be configured.
428            ent_network: Boolean, to check if ent network should be configured.
429            ent_network_pwd: Boolean, to check if ent pwd network should be configured.
430            owe_network: Boolean, to check if owe network should be configured.
431            sae_network: Boolean, to check if sae network should be configured.
432            radius_conf_2g: dictionary with enterprise radius server details.
433            radius_conf_5g: dictionary with enterprise radius server details.
434            radius_conf_pwd: dictionary with enterprise radiuse server details.
435            ap_count: APs to configure.
436        """
437        if mirror_ap and ap_count == 1:
438            raise ValueError("ap_count cannot be 1 if mirror_ap is True.")
439        if (channel_5g_ap2 or channel_2g_ap2) and ap_count == 1:
440            raise ValueError(
441                "ap_count cannot be 1 if channels of AP2 are provided.")
442        # we are creating a channel list for 2G and 5G bands. The list is of
443        # size 2 and this is based on the assumption that each testbed will have
444        # at most 2 APs.
445        if not channel_5g_ap2:
446            channel_5g_ap2 = channel_5g
447        if not channel_2g_ap2:
448            channel_2g_ap2 = channel_2g
449        channels_2g = [channel_2g, channel_2g_ap2]
450        channels_5g = [channel_5g, channel_5g_ap2]
451
452        self.reference_networks = []
453        self.wpa1_networks = []
454        self.wpa_networks = []
455        self.wep_networks = []
456        self.ent_networks = []
457        self.ent_networks_pwd = []
458        self.open_network = []
459        self.owe_networks = []
460        self.sae_networks = []
461        self.bssid_map = []
462        for i in range(ap_count):
463            network_list = []
464            if wpa1_network:
465                wpa1_dict = self.get_psk_network(mirror_ap,
466                                                 self.wpa1_networks,
467                                                 hidden, same_ssid,
468                                                 ssid_length_2g, ssid_length_5g,
469                                                 passphrase_length_2g,
470                                                 passphrase_length_5g)
471                wpa1_dict[hostapd_constants.BAND_2G]["security"] = "psk"
472                wpa1_dict[hostapd_constants.BAND_5G]["security"] = "psk"
473                self.wpa1_networks.append(wpa1_dict)
474                network_list.append(wpa1_dict)
475            if wpa_network:
476                wpa_dict = self.get_psk_network(mirror_ap,
477                                                self.reference_networks,
478                                                hidden, same_ssid,
479                                                ssid_length_2g, ssid_length_5g,
480                                                passphrase_length_2g,
481                                                passphrase_length_5g)
482                wpa_dict[hostapd_constants.BAND_2G]["security"] = "psk2"
483                wpa_dict[hostapd_constants.BAND_5G]["security"] = "psk2"
484                self.wpa_networks.append(wpa_dict)
485                network_list.append(wpa_dict)
486            if wep_network:
487                wep_dict = self.get_wep_network(mirror_ap, self.wep_networks,
488                                                hidden, same_ssid,
489                                                ssid_length_2g, ssid_length_5g)
490                network_list.append(wep_dict)
491            if ent_network:
492                ent_dict = self.get_open_network(mirror_ap, self.ent_networks,
493                                                 hidden, same_ssid,
494                                                 ssid_length_2g,
495                                                 ssid_length_5g)
496                ent_dict["2g"]["security"] = "wpa2"
497                ent_dict["2g"].update(radius_conf_2g)
498                ent_dict["5g"]["security"] = "wpa2"
499                ent_dict["5g"].update(radius_conf_5g)
500                network_list.append(ent_dict)
501            if ent_network_pwd:
502                ent_pwd_dict = self.get_open_network(mirror_ap,
503                                                     self.ent_networks_pwd,
504                                                     hidden, same_ssid,
505                                                     ssid_length_2g,
506                                                     ssid_length_5g)
507                ent_pwd_dict["2g"]["security"] = "wpa2"
508                ent_pwd_dict["2g"].update(radius_conf_pwd)
509                ent_pwd_dict["5g"]["security"] = "wpa2"
510                ent_pwd_dict["5g"].update(radius_conf_pwd)
511                network_list.append(ent_pwd_dict)
512            if open_network:
513                open_dict = self.get_open_network(mirror_ap, self.open_network,
514                                                  hidden, same_ssid,
515                                                  ssid_length_2g,
516                                                  ssid_length_5g)
517                network_list.append(open_dict)
518            if owe_network:
519                owe_dict = self.get_open_network(mirror_ap, self.owe_networks,
520                                                 hidden, same_ssid,
521                                                 ssid_length_2g,
522                                                 ssid_length_5g, "OWE")
523                owe_dict[hostapd_constants.BAND_2G]["security"] = "owe"
524                owe_dict[hostapd_constants.BAND_5G]["security"] = "owe"
525                network_list.append(owe_dict)
526            if sae_network:
527                sae_dict = self.get_psk_network(mirror_ap, self.sae_networks,
528                                                hidden, same_ssid,
529                                                hostapd_constants.SAE_KEY_MGMT,
530                                                ssid_length_2g, ssid_length_5g,
531                                                passphrase_length_2g,
532                                                passphrase_length_5g)
533                sae_dict[hostapd_constants.BAND_2G]["security"] = "sae"
534                sae_dict[hostapd_constants.BAND_5G]["security"] = "sae"
535                network_list.append(sae_dict)
536            self.access_points[i].configure_ap(network_list, channels_2g[i],
537                                               channels_5g[i])
538            self.access_points[i].start_ap()
539            self.bssid_map.append(
540                self.access_points[i].get_bssids_for_wifi_networks())
541            if mirror_ap:
542                self.access_points[i + 1].configure_ap(network_list,
543                                                       channels_2g[i+1],
544                                                       channels_5g[i+1])
545                self.access_points[i + 1].start_ap()
546                self.bssid_map.append(
547                    self.access_points[i + 1].get_bssids_for_wifi_networks())
548                break
549
550    def legacy_configure_ap_and_start(
551            self,
552            channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
553            channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
554            max_2g_networks=hostapd_constants.AP_DEFAULT_MAX_SSIDS_2G,
555            max_5g_networks=hostapd_constants.AP_DEFAULT_MAX_SSIDS_5G,
556            ap_ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
557            ap_passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G,
558            ap_ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
559            ap_passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G,
560            hidden=False,
561            same_ssid=False,
562            mirror_ap=True,
563            wpa_network=False,
564            wep_network=False,
565            ent_network=False,
566            radius_conf_2g=None,
567            radius_conf_5g=None,
568            ent_network_pwd=False,
569            radius_conf_pwd=None,
570            ap_count=1):
571
572        config_count = 1
573        count = 0
574
575        # For example, the NetworkSelector tests use 2 APs and require that
576        # both APs are not mirrored.
577        if not mirror_ap and ap_count == 1:
578            raise ValueError("ap_count cannot be 1 if mirror_ap is False.")
579
580        if not mirror_ap:
581            config_count = ap_count
582
583        self.user_params["reference_networks"] = []
584        self.user_params["open_network"] = []
585        if wpa_network:
586            self.user_params["wpa_networks"] = []
587        if wep_network:
588            self.user_params["wep_networks"] = []
589        if ent_network:
590            self.user_params["ent_networks"] = []
591        if ent_network_pwd:
592            self.user_params["ent_networks_pwd"] = []
593
594        # kill hostapd & dhcpd if the cleanup was not successful
595        for i in range(len(self.access_points)):
596            self.log.debug("Check ap state and cleanup")
597            self._cleanup_hostapd_and_dhcpd(i)
598
599        for count in range(config_count):
600
601            network_list_2g = []
602            network_list_5g = []
603
604            orig_network_list_2g = []
605            orig_network_list_5g = []
606
607            network_list_2g.append({"channel": channel_2g})
608            network_list_5g.append({"channel": channel_5g})
609
610            networks_dict = self.get_psk_network(
611                mirror_ap,
612                self.user_params["reference_networks"],
613                hidden=hidden,
614                same_ssid=same_ssid)
615            self.reference_networks = self.user_params["reference_networks"]
616
617            network_list_2g.append(networks_dict["2g"])
618            network_list_5g.append(networks_dict["5g"])
619
620            # When same_ssid is set, only configure one set of WPA networks.
621            # We cannot have more than one set because duplicate interface names
622            # are not allowed.
623            # TODO(bmahadev): Provide option to select the type of network,
624            # instead of defaulting to WPA.
625            if not same_ssid:
626                networks_dict = self.get_open_network(
627                    mirror_ap,
628                    self.user_params["open_network"],
629                    hidden=hidden,
630                    same_ssid=same_ssid)
631                self.open_network = self.user_params["open_network"]
632
633                network_list_2g.append(networks_dict["2g"])
634                network_list_5g.append(networks_dict["5g"])
635
636                if wpa_network:
637                    networks_dict = self.get_psk_network(
638                        mirror_ap,
639                        self.user_params["wpa_networks"],
640                        hidden=hidden,
641                        same_ssid=same_ssid,
642                        security_mode=hostapd_constants.WPA_STRING)
643                    self.wpa_networks = self.user_params["wpa_networks"]
644
645                    network_list_2g.append(networks_dict["2g"])
646                    network_list_5g.append(networks_dict["5g"])
647
648                if wep_network:
649                    networks_dict = self.get_wep_network(
650                        mirror_ap,
651                        self.user_params["wep_networks"],
652                        hidden=hidden,
653                        same_ssid=same_ssid)
654                    self.wep_networks = self.user_params["wep_networks"]
655
656                    network_list_2g.append(networks_dict["2g"])
657                    network_list_5g.append(networks_dict["5g"])
658
659                if ent_network:
660                    networks_dict = self.get_open_network(
661                        mirror_ap,
662                        self.user_params["ent_networks"],
663                        hidden=hidden,
664                        same_ssid=same_ssid)
665                    networks_dict["2g"][
666                        "security"] = hostapd_constants.ENT_STRING
667                    networks_dict["2g"].update(radius_conf_2g)
668                    networks_dict["5g"][
669                        "security"] = hostapd_constants.ENT_STRING
670                    networks_dict["5g"].update(radius_conf_5g)
671                    self.ent_networks = self.user_params["ent_networks"]
672
673                    network_list_2g.append(networks_dict["2g"])
674                    network_list_5g.append(networks_dict["5g"])
675
676                if ent_network_pwd:
677                    networks_dict = self.get_open_network(
678                        mirror_ap,
679                        self.user_params["ent_networks_pwd"],
680                        hidden=hidden,
681                        same_ssid=same_ssid)
682                    networks_dict["2g"][
683                        "security"] = hostapd_constants.ENT_STRING
684                    networks_dict["2g"].update(radius_conf_pwd)
685                    networks_dict["5g"][
686                        "security"] = hostapd_constants.ENT_STRING
687                    networks_dict["5g"].update(radius_conf_pwd)
688                    self.ent_networks_pwd = self.user_params[
689                        "ent_networks_pwd"]
690
691                    network_list_2g.append(networks_dict["2g"])
692                    network_list_5g.append(networks_dict["5g"])
693
694            orig_network_list_5g = copy.copy(network_list_5g)
695            orig_network_list_2g = copy.copy(network_list_2g)
696
697            if len(network_list_5g) > 1:
698                self.config_5g = self._generate_legacy_ap_config(
699                    network_list_5g)
700            if len(network_list_2g) > 1:
701                self.config_2g = self._generate_legacy_ap_config(
702                    network_list_2g)
703
704            self.access_points[count].start_ap(self.config_2g)
705            self.access_points[count].start_ap(self.config_5g)
706            self.populate_bssid(count, self.access_points[count],
707                                orig_network_list_5g, orig_network_list_2g)
708
709        # Repeat configuration on the second router.
710        if mirror_ap and ap_count == 2:
711            self.access_points[AP_2].start_ap(self.config_2g)
712            self.access_points[AP_2].start_ap(self.config_5g)
713            self.populate_bssid(AP_2, self.access_points[AP_2],
714                                orig_network_list_5g, orig_network_list_2g)
715
716    def _kill_processes(self, ap, daemon):
717        """ Kill hostapd and dhcpd daemons
718
719        Args:
720            ap: AP to cleanup
721            daemon: process to kill
722
723        Returns: True/False if killing process is successful
724        """
725        self.log.info("Killing %s" % daemon)
726        pids = ap.ssh.run('pidof %s' % daemon, ignore_status=True)
727        if pids.stdout:
728            ap.ssh.run('kill %s' % pids.stdout, ignore_status=True)
729        time.sleep(3)
730        pids = ap.ssh.run('pidof %s' % daemon, ignore_status=True)
731        if pids.stdout:
732            return False
733        return True
734
735    def _cleanup_hostapd_and_dhcpd(self, count):
736        """ Check if AP was cleaned up properly
737
738        Kill hostapd and dhcpd processes if cleanup was not successful in the
739        last run
740
741        Args:
742            count: AP to check
743
744        Returns:
745            New AccessPoint object if AP required cleanup
746
747        Raises:
748            Error: if the AccessPoint timed out to setup
749        """
750        ap = self.access_points[count]
751        phy_ifaces = ap.interfaces.get_physical_interface()
752        kill_hostapd = False
753        for iface in phy_ifaces:
754            if '2g_' in iface or '5g_' in iface or 'xg_' in iface:
755                kill_hostapd = True
756                break
757
758        if not kill_hostapd:
759            return
760
761        self.log.debug("Cleanup AP")
762        if not self._kill_processes(ap, 'hostapd') or \
763            not self._kill_processes(ap, 'dhcpd'):
764            raise ("Failed to cleanup AP")
765
766        ap.__init__(self.user_params['AccessPoint'][count])
767
768    def _generate_legacy_ap_config(self, network_list):
769        bss_settings = []
770        wlan_2g = self.access_points[AP_1].wlan_2g
771        wlan_5g = self.access_points[AP_1].wlan_5g
772        ap_settings = network_list.pop(0)
773        # TODO:(bmahadev) This is a bug. We should not have to pop the first
774        # network in the list and treat it as a separate case. Instead,
775        # create_ap_preset() should be able to take NULL ssid and security and
776        # build config based on the bss_Settings alone.
777        hostapd_config_settings = network_list.pop(0)
778        for network in network_list:
779            if "password" in network:
780                bss_settings.append(
781                    hostapd_bss_settings.BssSettings(
782                        name=network["SSID"],
783                        ssid=network["SSID"],
784                        hidden=network["hiddenSSID"],
785                        security=hostapd_security.Security(
786                            security_mode=network["security"],
787                            password=network["password"])))
788            elif "wepKeys" in network:
789                bss_settings.append(
790                    hostapd_bss_settings.BssSettings(
791                        name=network["SSID"],
792                        ssid=network["SSID"],
793                        hidden=network["hiddenSSID"],
794                        security=hostapd_security.Security(
795                            security_mode=network["security"],
796                            password=network["wepKeys"][0])))
797            elif network["security"] == hostapd_constants.ENT_STRING:
798                bss_settings.append(
799                    hostapd_bss_settings.BssSettings(
800                        name=network["SSID"],
801                        ssid=network["SSID"],
802                        hidden=network["hiddenSSID"],
803                        security=hostapd_security.Security(
804                            security_mode=network["security"],
805                            radius_server_ip=network["radius_server_ip"],
806                            radius_server_port=network["radius_server_port"],
807                            radius_server_secret=network[
808                                "radius_server_secret"])))
809            else:
810                bss_settings.append(
811                    hostapd_bss_settings.BssSettings(
812                        name=network["SSID"],
813                        ssid=network["SSID"],
814                        hidden=network["hiddenSSID"]))
815        if "password" in hostapd_config_settings:
816            config = hostapd_ap_preset.create_ap_preset(
817                iface_wlan_2g=wlan_2g,
818                iface_wlan_5g=wlan_5g,
819                channel=ap_settings["channel"],
820                ssid=hostapd_config_settings["SSID"],
821                hidden=hostapd_config_settings["hiddenSSID"],
822                security=hostapd_security.Security(
823                    security_mode=hostapd_config_settings["security"],
824                    password=hostapd_config_settings["password"]),
825                bss_settings=bss_settings)
826        elif "wepKeys" in hostapd_config_settings:
827            config = hostapd_ap_preset.create_ap_preset(
828                iface_wlan_2g=wlan_2g,
829                iface_wlan_5g=wlan_5g,
830                channel=ap_settings["channel"],
831                ssid=hostapd_config_settings["SSID"],
832                hidden=hostapd_config_settings["hiddenSSID"],
833                security=hostapd_security.Security(
834                    security_mode=hostapd_config_settings["security"],
835                    password=hostapd_config_settings["wepKeys"][0]),
836                bss_settings=bss_settings)
837        else:
838            config = hostapd_ap_preset.create_ap_preset(
839                iface_wlan_2g=wlan_2g,
840                iface_wlan_5g=wlan_5g,
841                channel=ap_settings["channel"],
842                ssid=hostapd_config_settings["SSID"],
843                hidden=hostapd_config_settings["hiddenSSID"],
844                bss_settings=bss_settings)
845        return config
846
847    def configure_packet_capture(
848            self,
849            channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
850            channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G):
851        """Configure packet capture for 2G and 5G bands.
852
853        Args:
854            channel_5g: Channel to set the monitor mode to for 5G band.
855            channel_2g: Channel to set the monitor mode to for 2G band.
856        """
857        self.packet_capture = self.packet_capture[0]
858        result = self.packet_capture.configure_monitor_mode(
859            hostapd_constants.BAND_2G, channel_2g)
860        if not result:
861            raise ValueError("Failed to configure channel for 2G band")
862
863        result = self.packet_capture.configure_monitor_mode(
864            hostapd_constants.BAND_5G, channel_5g)
865        if not result:
866            raise ValueError("Failed to configure channel for 5G band.")
867
868    @staticmethod
869    def wifi_test_wrap(fn):
870        def _safe_wrap_test_case(self, *args, **kwargs):
871            test_id = "%s:%s:%s" % (self.__class__.__name__, self.test_name,
872                                    self.log_begin_time.replace(' ', '-'))
873            self.test_id = test_id
874            self.result_detail = ""
875            tries = int(self.user_params.get("wifi_auto_rerun", 3))
876            for ad in self.android_devices:
877                ad.log_path = self.log_path
878            for i in range(tries + 1):
879                result = True
880                if i > 0:
881                    log_string = "[Test Case] RETRY:%s %s" % (i,
882                                                              self.test_name)
883                    self.log.info(log_string)
884                    self._teardown_test(self.test_name)
885                    self._setup_test(self.test_name)
886                try:
887                    result = fn(self, *args, **kwargs)
888                except signals.TestFailure as e:
889                    self.log.warn("Error msg: %s" % e)
890                    if self.result_detail:
891                        signal.details = self.result_detail
892                    result = False
893                except signals.TestSignal:
894                    if self.result_detail:
895                        signal.details = self.result_detail
896                    raise
897                except Exception as e:
898                    self.log.exception(e)
899                    asserts.fail(self.result_detail)
900                if result is False:
901                    if i < tries:
902                        continue
903                else:
904                    break
905            if result is not False:
906                asserts.explicit_pass(self.result_detail)
907            else:
908                asserts.fail(self.result_detail)
909
910        return _safe_wrap_test_case
911