• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2016 Google, Inc.
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import ipaddress
18import logging
19import os
20import re
21import shutil
22import random
23import subprocess
24import time
25
26from retry import retry
27from typing import Optional, Union
28
29from collections import namedtuple
30from enum import IntEnum
31from queue import Empty
32
33from acts import asserts
34from acts import context
35from acts import signals
36from acts import utils
37from acts.controllers import attenuator
38from acts.controllers.ap_lib import hostapd_security
39from acts.controllers.ap_lib import hostapd_ap_preset
40from acts.controllers.ap_lib.hostapd_constants import BAND_2G
41from acts.controllers.ap_lib.hostapd_constants import BAND_5G
42from acts_contrib.test_utils.net import connectivity_const as cconsts
43from acts_contrib.test_utils.tel import tel_defines
44from acts_contrib.test_utils.wifi import wifi_constants
45from acts_contrib.test_utils.wifi.aware import aware_test_utils as autils
46
47# Default timeout used for reboot, toggle WiFi and Airplane mode,
48# for the system to settle down after the operation.
49DEFAULT_TIMEOUT = 10
50# Number of seconds to wait for events that are supposed to happen quickly.
51# Like onSuccess for start background scan and confirmation on wifi state
52# change.
53SHORT_TIMEOUT = 30
54ROAMING_TIMEOUT = 30
55WIFI_CONNECTION_TIMEOUT_DEFAULT = 30
56DEFAULT_SCAN_TRIES = 3
57DEFAULT_CONNECT_TRIES = 3
58# Speed of light in m/s.
59SPEED_OF_LIGHT = 299792458
60# WiFi scan retry interval
61WIFI_SCAN_RETRY_INTERVAL_SEC = 5
62
63DEFAULT_PING_ADDR = "https://www.google.com/robots.txt"
64
65ROAMING_ATTN = {
66    "AP1_on_AP2_off": [0, 0, 95, 95],
67    "AP1_off_AP2_on": [95, 95, 0, 0],
68    "default": [0, 0, 0, 0]
69}
70
71
72class WifiEnums():
73
74    SSID_KEY = "SSID"  # Used for Wifi & SoftAp
75    SSID_PATTERN_KEY = "ssidPattern"
76    NETID_KEY = "network_id"
77    BSSID_KEY = "BSSID"  # Used for Wifi & SoftAp
78    BSSID_PATTERN_KEY = "bssidPattern"
79    PWD_KEY = "password"  # Used for Wifi & SoftAp
80    frequency_key = "frequency"
81    HIDDEN_KEY = "hiddenSSID"  # Used for Wifi & SoftAp
82    IS_APP_INTERACTION_REQUIRED = "isAppInteractionRequired"
83    IS_USER_INTERACTION_REQUIRED = "isUserInteractionRequired"
84    IS_SUGGESTION_METERED = "isMetered"
85    PRIORITY = "priority"
86    SECURITY = "security"  # Used for Wifi & SoftAp
87
88    # Used for SoftAp
89    AP_BAND_KEY = "apBand"
90    AP_CHANNEL_KEY = "apChannel"
91    AP_BANDS_KEY = "apBands"
92    AP_CHANNEL_FREQUENCYS_KEY = "apChannelFrequencies"
93    AP_MAC_RANDOMIZATION_SETTING_KEY = "MacRandomizationSetting"
94    AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY = "BridgedModeOpportunisticShutdownEnabled"
95    AP_IEEE80211AX_ENABLED_KEY = "Ieee80211axEnabled"
96    AP_MAXCLIENTS_KEY = "MaxNumberOfClients"
97    AP_SHUTDOWNTIMEOUT_KEY = "ShutdownTimeoutMillis"
98    AP_SHUTDOWNTIMEOUTENABLE_KEY = "AutoShutdownEnabled"
99    AP_CLIENTCONTROL_KEY = "ClientControlByUserEnabled"
100    AP_ALLOWEDLIST_KEY = "AllowedClientList"
101    AP_BLOCKEDLIST_KEY = "BlockedClientList"
102
103    WIFI_CONFIG_SOFTAP_BAND_2G = 1
104    WIFI_CONFIG_SOFTAP_BAND_5G = 2
105    WIFI_CONFIG_SOFTAP_BAND_2G_5G = 3
106    WIFI_CONFIG_SOFTAP_BAND_6G = 4
107    WIFI_CONFIG_SOFTAP_BAND_2G_6G = 5
108    WIFI_CONFIG_SOFTAP_BAND_5G_6G = 6
109    WIFI_CONFIG_SOFTAP_BAND_ANY = 7
110
111    # DO NOT USE IT for new test case! Replaced by WIFI_CONFIG_SOFTAP_BAND_
112    WIFI_CONFIG_APBAND_2G = WIFI_CONFIG_SOFTAP_BAND_2G
113    WIFI_CONFIG_APBAND_5G = WIFI_CONFIG_SOFTAP_BAND_5G
114    WIFI_CONFIG_APBAND_AUTO = WIFI_CONFIG_SOFTAP_BAND_2G_5G
115
116    WIFI_CONFIG_APBAND_2G_OLD = 0
117    WIFI_CONFIG_APBAND_5G_OLD = 1
118    WIFI_CONFIG_APBAND_AUTO_OLD = -1
119
120    WIFI_WPS_INFO_PBC = 0
121    WIFI_WPS_INFO_DISPLAY = 1
122    WIFI_WPS_INFO_KEYPAD = 2
123    WIFI_WPS_INFO_LABEL = 3
124    WIFI_WPS_INFO_INVALID = 4
125
126    class SoftApSecurityType():
127        OPEN = "NONE"
128        WPA2 = "WPA2_PSK"
129        WPA3_SAE_TRANSITION = "WPA3_SAE_TRANSITION"
130        WPA3_SAE = "WPA3_SAE"
131
132    class CountryCode():
133        AUSTRALIA = "AU"
134        CHINA = "CN"
135        GERMANY = "DE"
136        JAPAN = "JP"
137        UK = "GB"
138        US = "US"
139        UNKNOWN = "UNKNOWN"
140
141    # Start of Macros for EAP
142    # EAP types
143    class Eap(IntEnum):
144        NONE = -1
145        PEAP = 0
146        TLS = 1
147        TTLS = 2
148        PWD = 3
149        SIM = 4
150        AKA = 5
151        AKA_PRIME = 6
152        UNAUTH_TLS = 7
153
154    # EAP Phase2 types
155    class EapPhase2(IntEnum):
156        NONE = 0
157        PAP = 1
158        MSCHAP = 2
159        MSCHAPV2 = 3
160        GTC = 4
161
162    class Enterprise:
163        # Enterprise Config Macros
164        EMPTY_VALUE = "NULL"
165        EAP = "eap"
166        PHASE2 = "phase2"
167        IDENTITY = "identity"
168        ANON_IDENTITY = "anonymous_identity"
169        PASSWORD = "password"
170        SUBJECT_MATCH = "subject_match"
171        ALTSUBJECT_MATCH = "altsubject_match"
172        DOM_SUFFIX_MATCH = "domain_suffix_match"
173        CLIENT_CERT = "client_cert"
174        CA_CERT = "ca_cert"
175        ENGINE = "engine"
176        ENGINE_ID = "engine_id"
177        PRIVATE_KEY_ID = "key_id"
178        REALM = "realm"
179        PLMN = "plmn"
180        FQDN = "FQDN"
181        FRIENDLY_NAME = "providerFriendlyName"
182        ROAMING_IDS = "roamingConsortiumIds"
183        OCSP = "ocsp"
184
185    # End of Macros for EAP
186
187    # Macros for wifi p2p.
188    WIFI_P2P_SERVICE_TYPE_ALL = 0
189    WIFI_P2P_SERVICE_TYPE_BONJOUR = 1
190    WIFI_P2P_SERVICE_TYPE_UPNP = 2
191    WIFI_P2P_SERVICE_TYPE_VENDOR_SPECIFIC = 255
192
193    class ScanResult:
194        CHANNEL_WIDTH_20MHZ = 0
195        CHANNEL_WIDTH_40MHZ = 1
196        CHANNEL_WIDTH_80MHZ = 2
197        CHANNEL_WIDTH_160MHZ = 3
198        CHANNEL_WIDTH_80MHZ_PLUS_MHZ = 4
199
200    # Macros for wifi rtt.
201    class RttType(IntEnum):
202        TYPE_ONE_SIDED = 1
203        TYPE_TWO_SIDED = 2
204
205    class RttPeerType(IntEnum):
206        PEER_TYPE_AP = 1
207        PEER_TYPE_STA = 2  # Requires NAN.
208        PEER_P2P_GO = 3
209        PEER_P2P_CLIENT = 4
210        PEER_NAN = 5
211
212    class RttPreamble(IntEnum):
213        PREAMBLE_LEGACY = 0x01
214        PREAMBLE_HT = 0x02
215        PREAMBLE_VHT = 0x04
216
217    class RttBW(IntEnum):
218        BW_5_SUPPORT = 0x01
219        BW_10_SUPPORT = 0x02
220        BW_20_SUPPORT = 0x04
221        BW_40_SUPPORT = 0x08
222        BW_80_SUPPORT = 0x10
223        BW_160_SUPPORT = 0x20
224
225    class Rtt(IntEnum):
226        STATUS_SUCCESS = 0
227        STATUS_FAILURE = 1
228        STATUS_FAIL_NO_RSP = 2
229        STATUS_FAIL_REJECTED = 3
230        STATUS_FAIL_NOT_SCHEDULED_YET = 4
231        STATUS_FAIL_TM_TIMEOUT = 5
232        STATUS_FAIL_AP_ON_DIFF_CHANNEL = 6
233        STATUS_FAIL_NO_CAPABILITY = 7
234        STATUS_ABORTED = 8
235        STATUS_FAIL_INVALID_TS = 9
236        STATUS_FAIL_PROTOCOL = 10
237        STATUS_FAIL_SCHEDULE = 11
238        STATUS_FAIL_BUSY_TRY_LATER = 12
239        STATUS_INVALID_REQ = 13
240        STATUS_NO_WIFI = 14
241        STATUS_FAIL_FTM_PARAM_OVERRIDE = 15
242
243        REASON_UNSPECIFIED = -1
244        REASON_NOT_AVAILABLE = -2
245        REASON_INVALID_LISTENER = -3
246        REASON_INVALID_REQUEST = -4
247
248    class RttParam:
249        device_type = "deviceType"
250        request_type = "requestType"
251        BSSID = "bssid"
252        channel_width = "channelWidth"
253        frequency = "frequency"
254        center_freq0 = "centerFreq0"
255        center_freq1 = "centerFreq1"
256        number_burst = "numberBurst"
257        interval = "interval"
258        num_samples_per_burst = "numSamplesPerBurst"
259        num_retries_per_measurement_frame = "numRetriesPerMeasurementFrame"
260        num_retries_per_FTMR = "numRetriesPerFTMR"
261        lci_request = "LCIRequest"
262        lcr_request = "LCRRequest"
263        burst_timeout = "burstTimeout"
264        preamble = "preamble"
265        bandwidth = "bandwidth"
266        margin = "margin"
267
268    RTT_MARGIN_OF_ERROR = {
269        RttBW.BW_80_SUPPORT: 2,
270        RttBW.BW_40_SUPPORT: 5,
271        RttBW.BW_20_SUPPORT: 5
272    }
273
274    # Macros as specified in the WifiScanner code.
275    WIFI_BAND_UNSPECIFIED = 0  # not specified
276    WIFI_BAND_24_GHZ = 1  # 2.4 GHz band
277    WIFI_BAND_5_GHZ = 2  # 5 GHz band without DFS channels
278    WIFI_BAND_5_GHZ_DFS_ONLY = 4  # 5 GHz band with DFS channels
279    WIFI_BAND_5_GHZ_WITH_DFS = 6  # 5 GHz band with DFS channels
280    WIFI_BAND_BOTH = 3  # both bands without DFS channels
281    WIFI_BAND_BOTH_WITH_DFS = 7  # both bands with DFS channels
282
283    REPORT_EVENT_AFTER_BUFFER_FULL = 0
284    REPORT_EVENT_AFTER_EACH_SCAN = 1
285    REPORT_EVENT_FULL_SCAN_RESULT = 2
286
287    SCAN_TYPE_LOW_LATENCY = 0
288    SCAN_TYPE_LOW_POWER = 1
289    SCAN_TYPE_HIGH_ACCURACY = 2
290
291    # US Wifi frequencies
292    ALL_2G_FREQUENCIES = [
293        2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457, 2462
294    ]
295    DFS_5G_FREQUENCIES = [
296        5260, 5280, 5300, 5320, 5500, 5520, 5540, 5560, 5580, 5600, 5620, 5640,
297        5660, 5680, 5700, 5720
298    ]
299    NONE_DFS_5G_FREQUENCIES = [
300        5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805, 5825
301    ]
302    ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES
303
304    band_to_frequencies = {
305        WIFI_BAND_24_GHZ: ALL_2G_FREQUENCIES,
306        WIFI_BAND_5_GHZ: NONE_DFS_5G_FREQUENCIES,
307        WIFI_BAND_5_GHZ_DFS_ONLY: DFS_5G_FREQUENCIES,
308        WIFI_BAND_5_GHZ_WITH_DFS: ALL_5G_FREQUENCIES,
309        WIFI_BAND_BOTH: ALL_2G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES,
310        WIFI_BAND_BOTH_WITH_DFS: ALL_5G_FREQUENCIES + ALL_2G_FREQUENCIES
311    }
312
313    # TODO: add all of the band mapping.
314    softap_band_frequencies = {
315        WIFI_CONFIG_SOFTAP_BAND_2G: ALL_2G_FREQUENCIES,
316        WIFI_CONFIG_SOFTAP_BAND_5G: ALL_5G_FREQUENCIES
317    }
318
319    # All Wifi frequencies to channels lookup.
320    freq_to_channel = {
321        2412: 1,
322        2417: 2,
323        2422: 3,
324        2427: 4,
325        2432: 5,
326        2437: 6,
327        2442: 7,
328        2447: 8,
329        2452: 9,
330        2457: 10,
331        2462: 11,
332        2467: 12,
333        2472: 13,
334        2484: 14,
335        4915: 183,
336        4920: 184,
337        4925: 185,
338        4935: 187,
339        4940: 188,
340        4945: 189,
341        4960: 192,
342        4980: 196,
343        5035: 7,
344        5040: 8,
345        5045: 9,
346        5055: 11,
347        5060: 12,
348        5080: 16,
349        5170: 34,
350        5180: 36,
351        5190: 38,
352        5200: 40,
353        5210: 42,
354        5220: 44,
355        5230: 46,
356        5240: 48,
357        5260: 52,
358        5280: 56,
359        5300: 60,
360        5320: 64,
361        5500: 100,
362        5520: 104,
363        5540: 108,
364        5560: 112,
365        5580: 116,
366        5600: 120,
367        5620: 124,
368        5640: 128,
369        5660: 132,
370        5680: 136,
371        5700: 140,
372        5745: 149,
373        5765: 153,
374        5785: 157,
375        5795: 159,
376        5805: 161,
377        5825: 165,
378        5845: 169,
379        5865: 173,
380        5885: 177
381    }
382
383    # All Wifi channels to frequencies lookup.
384    channel_2G_to_freq = {
385        1: 2412,
386        2: 2417,
387        3: 2422,
388        4: 2427,
389        5: 2432,
390        6: 2437,
391        7: 2442,
392        8: 2447,
393        9: 2452,
394        10: 2457,
395        11: 2462,
396        12: 2467,
397        13: 2472,
398        14: 2484
399    }
400
401    channel_5G_to_freq = {
402        183: 4915,
403        184: 4920,
404        185: 4925,
405        187: 4935,
406        188: 4940,
407        189: 4945,
408        192: 4960,
409        196: 4980,
410        7: 5035,
411        8: 5040,
412        9: 5045,
413        11: 5055,
414        12: 5060,
415        16: 5080,
416        34: 5170,
417        36: 5180,
418        38: 5190,
419        40: 5200,
420        42: 5210,
421        44: 5220,
422        46: 5230,
423        48: 5240,
424        50: 5250,
425        52: 5260,
426        56: 5280,
427        60: 5300,
428        64: 5320,
429        100: 5500,
430        104: 5520,
431        108: 5540,
432        112: 5560,
433        116: 5580,
434        120: 5600,
435        124: 5620,
436        128: 5640,
437        132: 5660,
438        136: 5680,
439        140: 5700,
440        149: 5745,
441        151: 5755,
442        153: 5765,
443        155: 5775,
444        157: 5785,
445        159: 5795,
446        161: 5805,
447        165: 5825,
448        169: 5845,
449        173: 5865,
450        177: 5885
451    }
452
453    channel_6G_to_freq = {4 * x + 1: 5955 + 20 * x for x in range(59)}
454
455    channel_to_freq = {
456        '2G': channel_2G_to_freq,
457        '5G': channel_5G_to_freq,
458        '6G': channel_6G_to_freq
459    }
460
461
462class WifiChannelBase:
463    ALL_2G_FREQUENCIES = []
464    DFS_5G_FREQUENCIES = []
465    NONE_DFS_5G_FREQUENCIES = []
466    ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES
467    MIX_CHANNEL_SCAN = []
468
469    def band_to_freq(self, band):
470        _band_to_frequencies = {
471            WifiEnums.WIFI_BAND_24_GHZ:
472            self.ALL_2G_FREQUENCIES,
473            WifiEnums.WIFI_BAND_5_GHZ:
474            self.NONE_DFS_5G_FREQUENCIES,
475            WifiEnums.WIFI_BAND_5_GHZ_DFS_ONLY:
476            self.DFS_5G_FREQUENCIES,
477            WifiEnums.WIFI_BAND_5_GHZ_WITH_DFS:
478            self.ALL_5G_FREQUENCIES,
479            WifiEnums.WIFI_BAND_BOTH:
480            self.ALL_2G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES,
481            WifiEnums.WIFI_BAND_BOTH_WITH_DFS:
482            self.ALL_5G_FREQUENCIES + self.ALL_2G_FREQUENCIES
483        }
484        return _band_to_frequencies[band]
485
486
487class WifiChannelUS(WifiChannelBase):
488    # US Wifi frequencies
489    ALL_2G_FREQUENCIES = [
490        2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457, 2462
491    ]
492    NONE_DFS_5G_FREQUENCIES = [
493        5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805, 5825
494    ]
495    MIX_CHANNEL_SCAN = [
496        2412, 2437, 2462, 5180, 5200, 5280, 5260, 5300, 5500, 5320, 5520, 5560,
497        5700, 5745, 5805
498    ]
499
500    def __init__(self, model=None, support_addition_channel=[]):
501        if model in support_addition_channel:
502            self.ALL_2G_FREQUENCIES = [
503                2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457,
504                2462, 2467, 2472
505                ]
506        self.DFS_5G_FREQUENCIES = [
507            5260, 5280, 5300, 5320, 5500, 5520, 5540, 5560, 5580, 5600, 5620,
508            5640, 5660, 5680, 5700, 5720, 5845, 5865, 5885
509            ]
510        self.ALL_5G_FREQUENCIES = self.DFS_5G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES
511
512
513class WifiReferenceNetworks:
514    """ Class to parse and return networks of different band and
515        auth type from reference_networks
516    """
517    def __init__(self, obj):
518        self.reference_networks = obj
519        self.WIFI_2G = "2g"
520        self.WIFI_5G = "5g"
521
522        self.secure_networks_2g = []
523        self.secure_networks_5g = []
524        self.open_networks_2g = []
525        self.open_networks_5g = []
526        self._parse_networks()
527
528    def _parse_networks(self):
529        for network in self.reference_networks:
530            for key in network:
531                if key == self.WIFI_2G:
532                    if "password" in network[key]:
533                        self.secure_networks_2g.append(network[key])
534                    else:
535                        self.open_networks_2g.append(network[key])
536                else:
537                    if "password" in network[key]:
538                        self.secure_networks_5g.append(network[key])
539                    else:
540                        self.open_networks_5g.append(network[key])
541
542    def return_2g_secure_networks(self):
543        return self.secure_networks_2g
544
545    def return_5g_secure_networks(self):
546        return self.secure_networks_5g
547
548    def return_2g_open_networks(self):
549        return self.open_networks_2g
550
551    def return_5g_open_networks(self):
552        return self.open_networks_5g
553
554    def return_secure_networks(self):
555        return self.secure_networks_2g + self.secure_networks_5g
556
557    def return_open_networks(self):
558        return self.open_networks_2g + self.open_networks_5g
559
560
561def _assert_on_fail_handler(func, assert_on_fail, *args, **kwargs):
562    """Wrapper function that handles the bahevior of assert_on_fail.
563
564    When assert_on_fail is True, let all test signals through, which can
565    terminate test cases directly. When assert_on_fail is False, the wrapper
566    raises no test signals and reports operation status by returning True or
567    False.
568
569    Args:
570        func: The function to wrap. This function reports operation status by
571              raising test signals.
572        assert_on_fail: A boolean that specifies if the output of the wrapper
573                        is test signal based or return value based.
574        args: Positional args for func.
575        kwargs: Name args for func.
576
577    Returns:
578        If assert_on_fail is True, returns True/False to signal operation
579        status, otherwise return nothing.
580    """
581    try:
582        func(*args, **kwargs)
583        if not assert_on_fail:
584            return True
585    except signals.TestSignal:
586        if assert_on_fail:
587            raise
588        return False
589
590
591def assert_network_in_list(target, network_list):
592    """Makes sure a specified target Wi-Fi network exists in a list of Wi-Fi
593    networks.
594
595    Args:
596        target: A dict representing a Wi-Fi network.
597                E.g. {WifiEnums.SSID_KEY: "SomeNetwork"}
598        network_list: A list of dicts, each representing a Wi-Fi network.
599    """
600    match_results = match_networks(target, network_list)
601    asserts.assert_true(
602        match_results, "Target network %s, does not exist in network list %s" %
603        (target, network_list))
604
605
606def match_networks(target_params, networks):
607    """Finds the WiFi networks that match a given set of parameters in a list
608    of WiFi networks.
609
610    To be considered a match, the network should contain every key-value pair
611    of target_params
612
613    Args:
614        target_params: A dict with 1 or more key-value pairs representing a Wi-Fi network.
615                       E.g { 'SSID': 'wh_ap1_5g', 'BSSID': '30:b5:c2:33:e4:47' }
616        networks: A list of dict objects representing WiFi networks.
617
618    Returns:
619        The networks that match the target parameters.
620    """
621    results = []
622    asserts.assert_true(target_params,
623                        "Expected networks object 'target_params' is empty")
624    for n in networks:
625        add_network = 1
626        for k, v in target_params.items():
627            if k not in n:
628                add_network = 0
629                break
630            if n[k] != v:
631                add_network = 0
632                break
633        if add_network:
634            results.append(n)
635    return results
636
637
638def wait_for_wifi_state(ad, state, assert_on_fail=True):
639    """Waits for the device to transition to the specified wifi state
640
641    Args:
642        ad: An AndroidDevice object.
643        state: Wifi state to wait for.
644        assert_on_fail: If True, error checks in this function will raise test
645                        failure signals.
646
647    Returns:
648        If assert_on_fail is False, function returns True if the device transitions
649        to the specified state, False otherwise. If assert_on_fail is True, no return value.
650    """
651    return _assert_on_fail_handler(_wait_for_wifi_state,
652                                   assert_on_fail,
653                                   ad,
654                                   state=state)
655
656
657def _wait_for_wifi_state(ad, state):
658    """Toggles the state of wifi.
659
660    TestFailure signals are raised when something goes wrong.
661
662    Args:
663        ad: An AndroidDevice object.
664        state: Wifi state to wait for.
665    """
666    if state == ad.droid.wifiCheckState():
667        # Check if the state is already achieved, so we don't wait for the
668        # state change event by mistake.
669        return
670    ad.droid.wifiStartTrackingStateChange()
671    fail_msg = "Device did not transition to Wi-Fi state to %s on %s." % (
672        state, ad.serial)
673    try:
674        ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED,
675                             lambda x: x["data"]["enabled"] == state,
676                             SHORT_TIMEOUT)
677    except Empty:
678        asserts.assert_equal(state, ad.droid.wifiCheckState(), fail_msg)
679    finally:
680        ad.droid.wifiStopTrackingStateChange()
681
682
683def wifi_toggle_state(ad, new_state=None, assert_on_fail=True):
684    """Toggles the state of wifi.
685
686    Args:
687        ad: An AndroidDevice object.
688        new_state: Wifi state to set to. If None, opposite of the current state.
689        assert_on_fail: If True, error checks in this function will raise test
690                        failure signals.
691
692    Returns:
693        If assert_on_fail is False, function returns True if the toggle was
694        successful, False otherwise. If assert_on_fail is True, no return value.
695    """
696    return _assert_on_fail_handler(_wifi_toggle_state,
697                                   assert_on_fail,
698                                   ad,
699                                   new_state=new_state)
700
701
702def _wifi_toggle_state(ad, new_state=None):
703    """Toggles the state of wifi.
704
705    TestFailure signals are raised when something goes wrong.
706
707    Args:
708        ad: An AndroidDevice object.
709        new_state: The state to set Wi-Fi to. If None, opposite of the current
710                   state will be set.
711    """
712    if new_state is None:
713        new_state = not ad.droid.wifiCheckState()
714    elif new_state == ad.droid.wifiCheckState():
715        # Check if the new_state is already achieved, so we don't wait for the
716        # state change event by mistake.
717        return
718    ad.droid.wifiStartTrackingStateChange()
719    ad.log.info("Setting Wi-Fi state to %s.", new_state)
720    ad.ed.clear_all_events()
721    # Setting wifi state.
722    ad.droid.wifiToggleState(new_state)
723    time.sleep(2)
724    fail_msg = "Failed to set Wi-Fi state to %s on %s." % (new_state,
725                                                           ad.serial)
726    try:
727        ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED,
728                             lambda x: x["data"]["enabled"] == new_state,
729                             SHORT_TIMEOUT)
730    except Empty:
731        asserts.assert_equal(new_state, ad.droid.wifiCheckState(), fail_msg)
732    finally:
733        ad.droid.wifiStopTrackingStateChange()
734
735
736def reset_wifi(ad):
737    """Clears all saved Wi-Fi networks on a device.
738
739    This will turn Wi-Fi on.
740
741    Args:
742        ad: An AndroidDevice object.
743
744    """
745    networks = ad.droid.wifiGetConfiguredNetworks()
746    if not networks:
747        return
748    removed = []
749    for n in networks:
750        if n['networkId'] not in removed:
751            ad.droid.wifiForgetNetwork(n['networkId'])
752            removed.append(n['networkId'])
753        else:
754            continue
755        try:
756            event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS,
757                                    SHORT_TIMEOUT)
758        except Empty:
759            logging.warning("Could not confirm the removal of network %s.", n)
760    # Check again to see if there's any network left.
761    asserts.assert_true(
762        not ad.droid.wifiGetConfiguredNetworks(),
763        "Failed to remove these configured Wi-Fi networks: %s" % networks)
764
765
766
767def toggle_airplane_mode_on_and_off(ad):
768    """Turn ON and OFF Airplane mode.
769
770    ad: An AndroidDevice object.
771    Returns: Assert if turning on/off Airplane mode fails.
772
773    """
774    ad.log.debug("Toggling Airplane mode ON.")
775    asserts.assert_true(utils.force_airplane_mode(ad, True),
776                        "Can not turn on airplane mode on: %s" % ad.serial)
777    time.sleep(DEFAULT_TIMEOUT)
778    ad.log.debug("Toggling Airplane mode OFF.")
779    asserts.assert_true(utils.force_airplane_mode(ad, False),
780                        "Can not turn on airplane mode on: %s" % ad.serial)
781    time.sleep(DEFAULT_TIMEOUT)
782
783
784def toggle_wifi_off_and_on(ad):
785    """Turn OFF and ON WiFi.
786
787    ad: An AndroidDevice object.
788    Returns: Assert if turning off/on WiFi fails.
789
790    """
791    ad.log.debug("Toggling wifi OFF.")
792    wifi_toggle_state(ad, False)
793    time.sleep(DEFAULT_TIMEOUT)
794    ad.log.debug("Toggling wifi ON.")
795    wifi_toggle_state(ad, True)
796    time.sleep(DEFAULT_TIMEOUT)
797
798
799def wifi_forget_network(ad, net_ssid):
800    """Remove configured Wifi network on an android device.
801
802    Args:
803        ad: android_device object for forget network.
804        net_ssid: ssid of network to be forget
805
806    """
807    networks = ad.droid.wifiGetConfiguredNetworks()
808    if not networks:
809        return
810    removed = []
811    for n in networks:
812        if net_ssid in n[WifiEnums.SSID_KEY] and n['networkId'] not in removed:
813            ad.droid.wifiForgetNetwork(n['networkId'])
814            removed.append(n['networkId'])
815            try:
816                event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS,
817                                        SHORT_TIMEOUT)
818            except Empty:
819                asserts.fail("Failed to remove network %s." % n)
820            break
821
822
823def wifi_test_device_init(ad, country_code=WifiEnums.CountryCode.US):
824    """Initializes an android device for wifi testing.
825
826    0. Make sure SL4A connection is established on the android device.
827    1. Disable location service's WiFi scan.
828    2. Turn WiFi on.
829    3. Clear all saved networks.
830    4. Set country code to US.
831    5. Enable WiFi verbose logging.
832    6. Sync device time with computer time.
833    7. Turn off cellular data.
834    8. Turn off ambient display.
835    """
836    utils.require_sl4a((ad, ))
837    ad.droid.wifiScannerToggleAlwaysAvailable(False)
838    msg = "Failed to turn off location service's scan."
839    asserts.assert_true(not ad.droid.wifiScannerIsAlwaysAvailable(), msg)
840    wifi_toggle_state(ad, True)
841    reset_wifi(ad)
842    ad.droid.wifiEnableVerboseLogging(1)
843    msg = "Failed to enable WiFi verbose logging."
844    asserts.assert_equal(ad.droid.wifiGetVerboseLoggingLevel(), 1, msg)
845    # We don't verify the following settings since they are not critical.
846    # Set wpa_supplicant log level to EXCESSIVE.
847    output = ad.adb.shell(
848        "wpa_cli -i wlan0 -p -g@android:wpa_wlan0 IFNAME="
849        "wlan0 log_level EXCESSIVE",
850        ignore_status=True)
851    ad.log.info("wpa_supplicant log change status: %s", output)
852    utils.sync_device_time(ad)
853    ad.droid.telephonyToggleDataConnection(False)
854    set_wifi_country_code(ad, country_code)
855    utils.set_ambient_display(ad, False)
856
857
858def set_wifi_country_code(ad, country_code):
859    """Sets the wifi country code on the device.
860
861    Args:
862        ad: An AndroidDevice object.
863        country_code: 2 letter ISO country code
864
865    Raises:
866        An RpcException if unable to set the country code.
867    """
868    try:
869        ad.adb.shell("cmd wifi force-country-code enabled %s" % country_code)
870    except Exception as e:
871        ad.droid.wifiSetCountryCode(WifiEnums.CountryCode.US)
872
873
874def start_wifi_connection_scan(ad):
875    """Starts a wifi connection scan and wait for results to become available.
876
877    Args:
878        ad: An AndroidDevice object.
879    """
880    ad.ed.clear_all_events()
881    ad.droid.wifiStartScan()
882    try:
883        ad.ed.pop_event("WifiManagerScanResultsAvailable", 60)
884    except Empty:
885        asserts.fail("Wi-Fi results did not become available within 60s.")
886
887
888def start_wifi_connection_scan_and_return_status(ad):
889    """
890    Starts a wifi connection scan and wait for results to become available
891    or a scan failure to be reported.
892
893    Args:
894        ad: An AndroidDevice object.
895    Returns:
896        True: if scan succeeded & results are available
897        False: if scan failed
898    """
899    ad.ed.clear_all_events()
900    ad.droid.wifiStartScan()
901    try:
902        events = ad.ed.pop_events("WifiManagerScan(ResultsAvailable|Failure)",
903                                  60)
904    except Empty:
905        asserts.fail(
906            "Wi-Fi scan results/failure did not become available within 60s.")
907    # If there are multiple matches, we check for atleast one success.
908    for event in events:
909        if event["name"] == "WifiManagerScanResultsAvailable":
910            return True
911        elif event["name"] == "WifiManagerScanFailure":
912            ad.log.debug("Scan failure received")
913    return False
914
915
916def start_wifi_connection_scan_and_check_for_network(ad,
917                                                     network_ssid,
918                                                     max_tries=3,
919                                                     found=True):
920    """
921    Start connectivity scans & checks if the |network_ssid| is seen in
922    scan results. The method performs a max of |max_tries| connectivity scans
923    to find the network.
924
925    Args:
926        ad: An AndroidDevice object.
927        network_ssid: SSID of the network we are looking for.
928        max_tries: Number of scans to try.
929        found: True if expected a given SSID to be found; False otherwise.
930    Returns:
931        True: if network_ssid status is expected in scan results.
932        False: if network_ssid status is expected in scan results.
933    """
934    start_time = time.time()
935    for num_tries in range(max_tries):
936        if start_wifi_connection_scan_and_return_status(ad):
937            scan_results = ad.droid.wifiGetScanResults()
938            match_results = match_networks({WifiEnums.SSID_KEY: network_ssid},
939                                           scan_results)
940            if found == (len(match_results) > 0):
941                if found:
942                    ad.log.debug("%s network found in %s seconds." %
943                                  (network_ssid, (time.time() - start_time)))
944                    return True
945                # if found == False, we loop over till max_tries to make sure the ssid is
946                # really no show.
947                elif not found and (num_tries + 1) == max_tries:
948                    ad.log.debug("%s network not found in %d tries in %s seconds." %
949                                 (network_ssid, max_tries, (time.time() - start_time)))
950                    return True
951        else:
952            if (num_tries + 1) == max_tries:
953                break
954            # wait for a while when a WiFi scan is failed, e.g. because of device busy.
955            time.sleep(WIFI_SCAN_RETRY_INTERVAL_SEC)
956    return False
957
958
959def start_wifi_connection_scan_and_ensure_network_found(
960        ad, network_ssid, max_tries=3):
961    """
962    Start connectivity scans & ensure the |network_ssid| is seen in
963    scan results. The method performs a max of |max_tries| connectivity scans
964    to find the network.
965    This method asserts on failure!
966
967    Args:
968        ad: An AndroidDevice object.
969        network_ssid: SSID of the network we are looking for.
970        max_tries: Number of scans to try.
971    """
972    ad.log.info("Starting scans to ensure %s is present", network_ssid)
973    assert_msg = "Failed to find " + network_ssid + " in scan results" \
974        " after " + str(max_tries) + " tries"
975    asserts.assert_true(
976        start_wifi_connection_scan_and_check_for_network(
977            ad, network_ssid, max_tries, True), assert_msg)
978
979
980def start_wifi_connection_scan_and_ensure_network_not_found(
981        ad, network_ssid, max_tries=3):
982    """
983    Start connectivity scans & ensure the |network_ssid| is not seen in
984    scan results. The method performs a max of |max_tries| connectivity scans
985    to find the network.
986    This method asserts on failure!
987
988    Args:
989        ad: An AndroidDevice object.
990        network_ssid: SSID of the network we are looking for.
991        max_tries: Number of scans to try.
992    """
993    ad.log.info("Starting scans to ensure %s is not present", network_ssid)
994    assert_msg = "Found " + network_ssid + " in scan results" \
995        " after " + str(max_tries) + " tries"
996    asserts.assert_true(
997        start_wifi_connection_scan_and_check_for_network(
998            ad, network_ssid, max_tries, False), assert_msg)
999
1000
1001def start_wifi_background_scan(ad, scan_setting):
1002    """Starts wifi background scan.
1003
1004    Args:
1005        ad: android_device object to initiate connection on.
1006        scan_setting: A dict representing the settings of the scan.
1007
1008    Returns:
1009        If scan was started successfully, event data of success event is returned.
1010    """
1011    idx = ad.droid.wifiScannerStartBackgroundScan(scan_setting)
1012    event = ad.ed.pop_event("WifiScannerScan{}onSuccess".format(idx),
1013                            SHORT_TIMEOUT)
1014    return event['data']
1015
1016
1017def start_wifi_tethering(ad, ssid, password, band=None, hidden=None,
1018                         security=None):
1019    """Starts wifi tethering on an android_device.
1020
1021    Args:
1022        ad: android_device to start wifi tethering on.
1023        ssid: The SSID the soft AP should broadcast.
1024        password: The password the soft AP should use.
1025        band: The band the soft AP should be set on. It should be either
1026            WifiEnums.WIFI_CONFIG_APBAND_2G or WifiEnums.WIFI_CONFIG_APBAND_5G.
1027        hidden: boolean to indicate if the AP needs to be hidden or not.
1028        security: security type of softap.
1029
1030    Returns:
1031        No return value. Error checks in this function will raise test failure signals
1032    """
1033    config = {WifiEnums.SSID_KEY: ssid}
1034    if password:
1035        config[WifiEnums.PWD_KEY] = password
1036    if band:
1037        config[WifiEnums.AP_BAND_KEY] = band
1038    if hidden:
1039        config[WifiEnums.HIDDEN_KEY] = hidden
1040    if security:
1041        config[WifiEnums.SECURITY] = security
1042    asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(config),
1043                        "Failed to update WifiAp Configuration")
1044    ad.droid.wifiStartTrackingTetherStateChange()
1045    ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False)
1046    try:
1047        ad.ed.pop_event("ConnectivityManagerOnTetheringStarted")
1048        ad.ed.wait_for_event("TetherStateChanged",
1049                             lambda x: x["data"]["ACTIVE_TETHER"], 30)
1050        ad.log.debug("Tethering started successfully.")
1051    except Empty:
1052        msg = "Failed to receive confirmation of wifi tethering starting"
1053        asserts.fail(msg)
1054    finally:
1055        ad.droid.wifiStopTrackingTetherStateChange()
1056
1057
1058def save_wifi_soft_ap_config(ad,
1059                             wifi_config,
1060                             band=None,
1061                             hidden=None,
1062                             security=None,
1063                             password=None,
1064                             channel=None,
1065                             max_clients=None,
1066                             shutdown_timeout_enable=None,
1067                             shutdown_timeout_millis=None,
1068                             client_control_enable=None,
1069                             allowedList=None,
1070                             blockedList=None,
1071                             bands=None,
1072                             channel_frequencys=None,
1073                             mac_randomization_setting=None,
1074                             bridged_opportunistic_shutdown_enabled=None,
1075                             ieee80211ax_enabled=None):
1076    """ Save a soft ap configuration and verified
1077    Args:
1078        ad: android_device to set soft ap configuration.
1079        wifi_config: a soft ap configuration object, at least include SSID.
1080        band: specifies the band for the soft ap.
1081        hidden: specifies the soft ap need to broadcast its SSID or not.
1082        security: specifies the security type for the soft ap.
1083        password: specifies the password for the soft ap.
1084        channel: specifies the channel for the soft ap.
1085        max_clients: specifies the maximum connected client number.
1086        shutdown_timeout_enable: specifies the auto shut down enable or not.
1087        shutdown_timeout_millis: specifies the shut down timeout value.
1088        client_control_enable: specifies the client control enable or not.
1089        allowedList: specifies allowed clients list.
1090        blockedList: specifies blocked clients list.
1091        bands: specifies the band list for the soft ap.
1092        channel_frequencys: specifies the channel frequency list for soft ap.
1093        mac_randomization_setting: specifies the mac randomization setting.
1094        bridged_opportunistic_shutdown_enabled: specifies the opportunistic
1095                shutdown enable or not.
1096        ieee80211ax_enabled: specifies the ieee80211ax enable or not.
1097    """
1098    if security and password:
1099        wifi_config[WifiEnums.SECURITY] = security
1100        wifi_config[WifiEnums.PWD_KEY] = password
1101    if hidden is not None:
1102        wifi_config[WifiEnums.HIDDEN_KEY] = hidden
1103    if max_clients is not None:
1104        wifi_config[WifiEnums.AP_MAXCLIENTS_KEY] = max_clients
1105    if shutdown_timeout_enable is not None:
1106        wifi_config[
1107            WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY] = shutdown_timeout_enable
1108    if shutdown_timeout_millis is not None:
1109        wifi_config[WifiEnums.AP_SHUTDOWNTIMEOUT_KEY] = shutdown_timeout_millis
1110    if client_control_enable is not None:
1111        wifi_config[WifiEnums.AP_CLIENTCONTROL_KEY] = client_control_enable
1112    if allowedList is not None:
1113        wifi_config[WifiEnums.AP_ALLOWEDLIST_KEY] = allowedList
1114    if blockedList is not None:
1115        wifi_config[WifiEnums.AP_BLOCKEDLIST_KEY] = blockedList
1116    if mac_randomization_setting is not None:
1117        wifi_config[WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY
1118                ] = mac_randomization_setting
1119    if bridged_opportunistic_shutdown_enabled is not None:
1120        wifi_config[WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY
1121                ] = bridged_opportunistic_shutdown_enabled
1122    if ieee80211ax_enabled is not None:
1123       wifi_config[WifiEnums.AP_IEEE80211AX_ENABLED_KEY]= ieee80211ax_enabled
1124    if channel_frequencys is not None:
1125        wifi_config[WifiEnums.AP_CHANNEL_FREQUENCYS_KEY] = channel_frequencys
1126    elif bands is not None:
1127        wifi_config[WifiEnums.AP_BANDS_KEY] = bands
1128    elif band is not None:
1129        if channel is not None:
1130            wifi_config[WifiEnums.AP_BAND_KEY] = band
1131            wifi_config[WifiEnums.AP_CHANNEL_KEY] = channel
1132        else:
1133             wifi_config[WifiEnums.AP_BAND_KEY] = band
1134
1135    if WifiEnums.AP_CHANNEL_KEY in wifi_config and wifi_config[
1136            WifiEnums.AP_CHANNEL_KEY] == 0:
1137        del wifi_config[WifiEnums.AP_CHANNEL_KEY]
1138
1139    if WifiEnums.SECURITY in wifi_config and wifi_config[
1140            WifiEnums.SECURITY] == WifiEnums.SoftApSecurityType.OPEN:
1141        del wifi_config[WifiEnums.SECURITY]
1142        del wifi_config[WifiEnums.PWD_KEY]
1143
1144    asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(wifi_config),
1145                        "Failed to set WifiAp Configuration")
1146
1147    wifi_ap = ad.droid.wifiGetApConfiguration()
1148    asserts.assert_true(
1149        wifi_ap[WifiEnums.SSID_KEY] == wifi_config[WifiEnums.SSID_KEY],
1150        "Hotspot SSID doesn't match")
1151    if WifiEnums.SECURITY in wifi_config:
1152        asserts.assert_true(
1153            wifi_ap[WifiEnums.SECURITY] == wifi_config[WifiEnums.SECURITY],
1154            "Hotspot Security doesn't match")
1155    if WifiEnums.PWD_KEY in wifi_config:
1156        asserts.assert_true(
1157            wifi_ap[WifiEnums.PWD_KEY] == wifi_config[WifiEnums.PWD_KEY],
1158            "Hotspot Password doesn't match")
1159
1160    if WifiEnums.HIDDEN_KEY in wifi_config:
1161        asserts.assert_true(
1162            wifi_ap[WifiEnums.HIDDEN_KEY] == wifi_config[WifiEnums.HIDDEN_KEY],
1163            "Hotspot hidden setting doesn't match")
1164
1165    if WifiEnums.AP_CHANNEL_KEY in wifi_config:
1166        asserts.assert_true(
1167            wifi_ap[WifiEnums.AP_CHANNEL_KEY] == wifi_config[
1168                WifiEnums.AP_CHANNEL_KEY], "Hotspot Channel doesn't match")
1169    if WifiEnums.AP_MAXCLIENTS_KEY in wifi_config:
1170        asserts.assert_true(
1171            wifi_ap[WifiEnums.AP_MAXCLIENTS_KEY] == wifi_config[
1172                WifiEnums.AP_MAXCLIENTS_KEY],
1173            "Hotspot Max Clients doesn't match")
1174    if WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY in wifi_config:
1175        asserts.assert_true(
1176            wifi_ap[WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY] == wifi_config[
1177                WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY],
1178            "Hotspot ShutDown feature flag doesn't match")
1179    if WifiEnums.AP_SHUTDOWNTIMEOUT_KEY in wifi_config:
1180        asserts.assert_true(
1181            wifi_ap[WifiEnums.AP_SHUTDOWNTIMEOUT_KEY] == wifi_config[
1182                WifiEnums.AP_SHUTDOWNTIMEOUT_KEY],
1183            "Hotspot ShutDown timeout setting doesn't match")
1184    if WifiEnums.AP_CLIENTCONTROL_KEY in wifi_config:
1185        asserts.assert_true(
1186            wifi_ap[WifiEnums.AP_CLIENTCONTROL_KEY] == wifi_config[
1187                WifiEnums.AP_CLIENTCONTROL_KEY],
1188            "Hotspot Client control flag doesn't match")
1189    if WifiEnums.AP_ALLOWEDLIST_KEY in wifi_config:
1190        asserts.assert_true(
1191            wifi_ap[WifiEnums.AP_ALLOWEDLIST_KEY] == wifi_config[
1192                WifiEnums.AP_ALLOWEDLIST_KEY],
1193            "Hotspot Allowed List doesn't match")
1194    if WifiEnums.AP_BLOCKEDLIST_KEY in wifi_config:
1195        asserts.assert_true(
1196            wifi_ap[WifiEnums.AP_BLOCKEDLIST_KEY] == wifi_config[
1197                WifiEnums.AP_BLOCKEDLIST_KEY],
1198            "Hotspot Blocked List doesn't match")
1199
1200    if WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY in wifi_config:
1201        asserts.assert_true(
1202            wifi_ap[WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY] == wifi_config[
1203                  WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY],
1204            "Hotspot Mac randomization setting doesn't match")
1205
1206    if WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY in wifi_config:
1207        asserts.assert_true(
1208            wifi_ap[WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY] == wifi_config[
1209                  WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY],
1210            "Hotspot bridged shutdown enable setting doesn't match")
1211
1212    if WifiEnums.AP_IEEE80211AX_ENABLED_KEY in wifi_config:
1213        asserts.assert_true(
1214            wifi_ap[WifiEnums.AP_IEEE80211AX_ENABLED_KEY] == wifi_config[
1215                  WifiEnums.AP_IEEE80211AX_ENABLED_KEY],
1216            "Hotspot 80211 AX enable setting doesn't match")
1217
1218    if WifiEnums.AP_CHANNEL_FREQUENCYS_KEY in wifi_config:
1219        asserts.assert_true(
1220            wifi_ap[WifiEnums.AP_CHANNEL_FREQUENCYS_KEY] == wifi_config[
1221                  WifiEnums.AP_CHANNEL_FREQUENCYS_KEY],
1222            "Hotspot channels setting doesn't match")
1223
1224def start_wifi_tethering_saved_config(ad):
1225    """ Turn on wifi hotspot with a config that is already saved """
1226    ad.droid.wifiStartTrackingTetherStateChange()
1227    ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False)
1228    try:
1229        ad.ed.pop_event("ConnectivityManagerOnTetheringStarted")
1230        ad.ed.wait_for_event("TetherStateChanged",
1231                             lambda x: x["data"]["ACTIVE_TETHER"], 30)
1232    except:
1233        asserts.fail("Didn't receive wifi tethering starting confirmation")
1234    finally:
1235        ad.droid.wifiStopTrackingTetherStateChange()
1236
1237
1238def stop_wifi_tethering(ad):
1239    """Stops wifi tethering on an android_device.
1240    Args:
1241        ad: android_device to stop wifi tethering on.
1242    """
1243    ad.droid.wifiStartTrackingTetherStateChange()
1244    ad.droid.connectivityStopTethering(tel_defines.TETHERING_WIFI)
1245    try:
1246        ad.ed.pop_event("WifiManagerApDisabled", 30)
1247        ad.ed.wait_for_event("TetherStateChanged",
1248                             lambda x: not x["data"]["ACTIVE_TETHER"], 30)
1249    except Empty:
1250        msg = "Failed to receive confirmation of wifi tethering stopping"
1251        asserts.fail(msg)
1252    finally:
1253        ad.droid.wifiStopTrackingTetherStateChange()
1254
1255
1256def toggle_wifi_and_wait_for_reconnection(ad,
1257                                          network,
1258                                          num_of_tries=1,
1259                                          assert_on_fail=True):
1260    """Toggle wifi state and then wait for Android device to reconnect to
1261    the provided wifi network.
1262
1263    This expects the device to be already connected to the provided network.
1264
1265    Logic steps are
1266     1. Ensure that we're connected to the network.
1267     2. Turn wifi off.
1268     3. Wait for 10 seconds.
1269     4. Turn wifi on.
1270     5. Wait for the "connected" event, then confirm the connected ssid is the
1271        one requested.
1272
1273    Args:
1274        ad: android_device object to initiate connection on.
1275        network: A dictionary representing the network to await connection. The
1276                 dictionary must have the key "SSID".
1277        num_of_tries: An integer that is the number of times to try before
1278                      delaring failure. Default is 1.
1279        assert_on_fail: If True, error checks in this function will raise test
1280                        failure signals.
1281
1282    Returns:
1283        If assert_on_fail is False, function returns True if the toggle was
1284        successful, False otherwise. If assert_on_fail is True, no return value.
1285    """
1286    return _assert_on_fail_handler(_toggle_wifi_and_wait_for_reconnection,
1287                                   assert_on_fail,
1288                                   ad,
1289                                   network,
1290                                   num_of_tries=num_of_tries)
1291
1292
1293def _toggle_wifi_and_wait_for_reconnection(ad, network, num_of_tries=3):
1294    """Toggle wifi state and then wait for Android device to reconnect to
1295    the provided wifi network.
1296
1297    This expects the device to be already connected to the provided network.
1298
1299    Logic steps are
1300     1. Ensure that we're connected to the network.
1301     2. Turn wifi off.
1302     3. Wait for 10 seconds.
1303     4. Turn wifi on.
1304     5. Wait for the "connected" event, then confirm the connected ssid is the
1305        one requested.
1306
1307    This will directly fail a test if anything goes wrong.
1308
1309    Args:
1310        ad: android_device object to initiate connection on.
1311        network: A dictionary representing the network to await connection. The
1312                 dictionary must have the key "SSID".
1313        num_of_tries: An integer that is the number of times to try before
1314                      delaring failure. Default is 1.
1315    """
1316    expected_ssid = network[WifiEnums.SSID_KEY]
1317    # First ensure that we're already connected to the provided network.
1318    verify_con = {WifiEnums.SSID_KEY: expected_ssid}
1319    verify_wifi_connection_info(ad, verify_con)
1320    # Now toggle wifi state and wait for the connection event.
1321    wifi_toggle_state(ad, False)
1322    time.sleep(10)
1323    wifi_toggle_state(ad, True)
1324    ad.droid.wifiStartTrackingStateChange()
1325    try:
1326        connect_result = None
1327        for i in range(num_of_tries):
1328            try:
1329                connect_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
1330                                                 30)
1331                break
1332            except Empty:
1333                pass
1334        asserts.assert_true(
1335            connect_result, "Failed to connect to Wi-Fi network %s on %s" %
1336            (network, ad.serial))
1337        logging.debug("Connection result on %s: %s.", ad.serial,
1338                      connect_result)
1339        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
1340        asserts.assert_equal(
1341            actual_ssid, expected_ssid, "Connected to the wrong network on %s."
1342            "Expected %s, but got %s." %
1343            (ad.serial, expected_ssid, actual_ssid))
1344        logging.info("Connected to Wi-Fi network %s on %s", actual_ssid,
1345                     ad.serial)
1346    finally:
1347        ad.droid.wifiStopTrackingStateChange()
1348
1349
1350def wait_for_connect(ad,
1351                     expected_ssid=None,
1352                     expected_id=None,
1353                     tries=2,
1354                     assert_on_fail=True):
1355    """Wait for a connect event.
1356
1357    This will directly fail a test if anything goes wrong.
1358
1359    Args:
1360        ad: An Android device object.
1361        expected_ssid: SSID of the network to connect to.
1362        expected_id: Network Id of the network to connect to.
1363        tries: An integer that is the number of times to try before failing.
1364        assert_on_fail: If True, error checks in this function will raise test
1365                        failure signals.
1366
1367    Returns:
1368        Returns a value only if assert_on_fail is false.
1369        Returns True if the connection was successful, False otherwise.
1370    """
1371    return _assert_on_fail_handler(_wait_for_connect, assert_on_fail, ad,
1372                                   expected_ssid, expected_id, tries)
1373
1374
1375def _wait_for_connect(ad, expected_ssid=None, expected_id=None, tries=2):
1376    """Wait for a connect event.
1377
1378    Args:
1379        ad: An Android device object.
1380        expected_ssid: SSID of the network to connect to.
1381        expected_id: Network Id of the network to connect to.
1382        tries: An integer that is the number of times to try before failing.
1383    """
1384    ad.droid.wifiStartTrackingStateChange()
1385    try:
1386        connect_result = _wait_for_connect_event(ad,
1387                                                 ssid=expected_ssid,
1388                                                 id=expected_id,
1389                                                 tries=tries)
1390        asserts.assert_true(
1391            connect_result,
1392            "Failed to connect to Wi-Fi network %s" % expected_ssid)
1393        ad.log.debug("Wi-Fi connection result: %s.", connect_result)
1394        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
1395        if expected_ssid:
1396            asserts.assert_equal(actual_ssid, expected_ssid,
1397                                 "Connected to the wrong network")
1398        actual_id = connect_result['data'][WifiEnums.NETID_KEY]
1399        if expected_id:
1400            asserts.assert_equal(actual_id, expected_id,
1401                                 "Connected to the wrong network")
1402        ad.log.info("Connected to Wi-Fi network %s.", actual_ssid)
1403    except Empty:
1404        asserts.fail("Failed to start connection process to %s" %
1405                     expected_ssid)
1406    except Exception as error:
1407        ad.log.error("Failed to connect to %s with error %s", expected_ssid,
1408                     error)
1409        raise signals.TestFailure("Failed to connect to %s network" %
1410                                  expected_ssid)
1411    finally:
1412        ad.droid.wifiStopTrackingStateChange()
1413
1414
1415def _wait_for_connect_event(ad, ssid=None, id=None, tries=1):
1416    """Wait for a connect event on queue and pop when available.
1417
1418    Args:
1419        ad: An Android device object.
1420        ssid: SSID of the network to connect to.
1421        id: Network Id of the network to connect to.
1422        tries: An integer that is the number of times to try before failing.
1423
1424    Returns:
1425        A dict with details of the connection data, which looks like this:
1426        {
1427         'time': 1485460337798,
1428         'name': 'WifiNetworkConnected',
1429         'data': {
1430                  'rssi': -27,
1431                  'is_24ghz': True,
1432                  'mac_address': '02:00:00:00:00:00',
1433                  'network_id': 1,
1434                  'BSSID': '30:b5:c2:33:d3:fc',
1435                  'ip_address': 117483712,
1436                  'link_speed': 54,
1437                  'supplicant_state': 'completed',
1438                  'hidden_ssid': False,
1439                  'SSID': 'wh_ap1_2g',
1440                  'is_5ghz': False}
1441        }
1442
1443    """
1444    conn_result = None
1445
1446    # If ssid and network id is None, just wait for any connect event.
1447    if id is None and ssid is None:
1448        for i in range(tries):
1449            try:
1450                conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
1451                                              30)
1452                break
1453            except Empty:
1454                pass
1455    else:
1456        # If ssid or network id is specified, wait for specific connect event.
1457        for i in range(tries):
1458            try:
1459                conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
1460                                              30)
1461                if id and conn_result['data'][WifiEnums.NETID_KEY] == id:
1462                    break
1463                elif ssid and conn_result['data'][WifiEnums.SSID_KEY] == ssid:
1464                    break
1465            except Empty:
1466                pass
1467
1468    return conn_result
1469
1470
1471def wait_for_disconnect(ad, timeout=10):
1472    """Wait for a disconnect event within the specified timeout.
1473
1474    Args:
1475        ad: Android device object.
1476        timeout: Timeout in seconds.
1477
1478    """
1479    try:
1480        ad.droid.wifiStartTrackingStateChange()
1481        event = ad.ed.pop_event("WifiNetworkDisconnected", timeout)
1482    except Empty:
1483        raise signals.TestFailure("Device did not disconnect from the network")
1484    finally:
1485        ad.droid.wifiStopTrackingStateChange()
1486
1487
1488def ensure_no_disconnect(ad, duration=10):
1489    """Ensure that there is no disconnect for the specified duration.
1490
1491    Args:
1492        ad: Android device object.
1493        duration: Duration in seconds.
1494
1495    """
1496    try:
1497        ad.droid.wifiStartTrackingStateChange()
1498        event = ad.ed.pop_event("WifiNetworkDisconnected", duration)
1499        raise signals.TestFailure("Device disconnected from the network")
1500    except Empty:
1501        pass
1502    finally:
1503        ad.droid.wifiStopTrackingStateChange()
1504
1505
1506def connect_to_wifi_network(ad, network, assert_on_fail=True,
1507                            check_connectivity=True, hidden=False,
1508                            num_of_scan_tries=DEFAULT_SCAN_TRIES,
1509                            num_of_connect_tries=DEFAULT_CONNECT_TRIES):
1510    """Connection logic for open and psk wifi networks.
1511
1512    Args:
1513        ad: AndroidDevice to use for connection
1514        network: network info of the network to connect to
1515        assert_on_fail: If true, errors from wifi_connect will raise
1516                        test failure signals.
1517        hidden: Is the Wifi network hidden.
1518        num_of_scan_tries: The number of times to try scan
1519                           interface before declaring failure.
1520        num_of_connect_tries: The number of times to try
1521                              connect wifi before declaring failure.
1522    """
1523    if hidden:
1524        start_wifi_connection_scan_and_ensure_network_not_found(
1525            ad, network[WifiEnums.SSID_KEY], max_tries=num_of_scan_tries)
1526    else:
1527        start_wifi_connection_scan_and_ensure_network_found(
1528            ad, network[WifiEnums.SSID_KEY], max_tries=num_of_scan_tries)
1529    wifi_connect(ad,
1530                 network,
1531                 num_of_tries=num_of_connect_tries,
1532                 assert_on_fail=assert_on_fail,
1533                 check_connectivity=check_connectivity)
1534
1535
1536def connect_to_wifi_network_with_id(ad, network_id, network_ssid):
1537    """Connect to the given network using network id and verify SSID.
1538
1539    Args:
1540        network_id: int Network Id of the network.
1541        network_ssid: string SSID of the network.
1542
1543    Returns: True if connect using network id was successful;
1544             False otherwise.
1545
1546    """
1547    start_wifi_connection_scan_and_ensure_network_found(ad, network_ssid)
1548    wifi_connect_by_id(ad, network_id)
1549    connect_data = ad.droid.wifiGetConnectionInfo()
1550    connect_ssid = connect_data[WifiEnums.SSID_KEY]
1551    ad.log.debug("Expected SSID = %s Connected SSID = %s" %
1552                 (network_ssid, connect_ssid))
1553    if connect_ssid != network_ssid:
1554        return False
1555    return True
1556
1557
1558def wifi_connect(ad,
1559                 network,
1560                 num_of_tries=1,
1561                 assert_on_fail=True,
1562                 check_connectivity=True):
1563    """Connect an Android device to a wifi network.
1564
1565    Initiate connection to a wifi network, wait for the "connected" event, then
1566    confirm the connected ssid is the one requested.
1567
1568    This will directly fail a test if anything goes wrong.
1569
1570    Args:
1571        ad: android_device object to initiate connection on.
1572        network: A dictionary representing the network to connect to. The
1573                 dictionary must have the key "SSID".
1574        num_of_tries: An integer that is the number of times to try before
1575                      delaring failure. Default is 1.
1576        assert_on_fail: If True, error checks in this function will raise test
1577                        failure signals.
1578
1579    Returns:
1580        Returns a value only if assert_on_fail is false.
1581        Returns True if the connection was successful, False otherwise.
1582    """
1583    return _assert_on_fail_handler(_wifi_connect,
1584                                   assert_on_fail,
1585                                   ad,
1586                                   network,
1587                                   num_of_tries=num_of_tries,
1588                                   check_connectivity=check_connectivity)
1589
1590
1591def _wifi_connect(ad, network, num_of_tries=1, check_connectivity=True):
1592    """Connect an Android device to a wifi network.
1593
1594    Initiate connection to a wifi network, wait for the "connected" event, then
1595    confirm the connected ssid is the one requested.
1596
1597    This will directly fail a test if anything goes wrong.
1598
1599    Args:
1600        ad: android_device object to initiate connection on.
1601        network: A dictionary representing the network to connect to. The
1602                 dictionary must have the key "SSID".
1603        num_of_tries: An integer that is the number of times to try before
1604                      delaring failure. Default is 1.
1605    """
1606    asserts.assert_true(
1607        WifiEnums.SSID_KEY in network,
1608        "Key '%s' must be present in network definition." % WifiEnums.SSID_KEY)
1609    ad.droid.wifiStartTrackingStateChange()
1610    expected_ssid = network[WifiEnums.SSID_KEY]
1611    ad.droid.wifiConnectByConfig(network)
1612    ad.log.info("Starting connection process to %s", expected_ssid)
1613    try:
1614        event = ad.ed.pop_event(wifi_constants.CONNECT_BY_CONFIG_SUCCESS, 30)
1615        connect_result = _wait_for_connect_event(ad,
1616                                                 ssid=expected_ssid,
1617                                                 tries=num_of_tries)
1618        asserts.assert_true(
1619            connect_result, "Failed to connect to Wi-Fi network %s on %s" %
1620            (network, ad.serial))
1621        ad.log.debug("Wi-Fi connection result: %s.", connect_result)
1622        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
1623        asserts.assert_equal(
1624            actual_ssid, expected_ssid,
1625            "Connected to the wrong network on %s." % ad.serial)
1626        ad.log.info("Connected to Wi-Fi network %s.", actual_ssid)
1627
1628        if check_connectivity:
1629            internet = validate_connection(ad, DEFAULT_PING_ADDR)
1630            if not internet:
1631                raise signals.TestFailure(
1632                    "Failed to connect to internet on %s" % expected_ssid)
1633    except Empty:
1634        asserts.fail("Failed to start connection process to %s on %s" %
1635                     (network, ad.serial))
1636    except Exception as error:
1637        ad.log.error("Failed to connect to %s with error %s", expected_ssid,
1638                     error)
1639        raise signals.TestFailure("Failed to connect to %s network" % network)
1640
1641    finally:
1642        ad.droid.wifiStopTrackingStateChange()
1643
1644
1645def wifi_connect_by_id(ad, network_id, num_of_tries=3, assert_on_fail=True):
1646    """Connect an Android device to a wifi network using network Id.
1647
1648    Start connection to the wifi network, with the given network Id, wait for
1649    the "connected" event, then verify the connected network is the one requested.
1650
1651    This will directly fail a test if anything goes wrong.
1652
1653    Args:
1654        ad: android_device object to initiate connection on.
1655        network_id: Integer specifying the network id of the network.
1656        num_of_tries: An integer that is the number of times to try before
1657                      delaring failure. Default is 1.
1658        assert_on_fail: If True, error checks in this function will raise test
1659                        failure signals.
1660
1661    Returns:
1662        Returns a value only if assert_on_fail is false.
1663        Returns True if the connection was successful, False otherwise.
1664    """
1665    _assert_on_fail_handler(_wifi_connect_by_id, assert_on_fail, ad,
1666                            network_id, num_of_tries)
1667
1668
1669def _wifi_connect_by_id(ad, network_id, num_of_tries=1):
1670    """Connect an Android device to a wifi network using it's network id.
1671
1672    Start connection to the wifi network, with the given network id, wait for
1673    the "connected" event, then verify the connected network is the one requested.
1674
1675    Args:
1676        ad: android_device object to initiate connection on.
1677        network_id: Integer specifying the network id of the network.
1678        num_of_tries: An integer that is the number of times to try before
1679                      delaring failure. Default is 1.
1680    """
1681    ad.droid.wifiStartTrackingStateChange()
1682    # Clear all previous events.
1683    ad.ed.clear_all_events()
1684    ad.droid.wifiConnectByNetworkId(network_id)
1685    ad.log.info("Starting connection to network with id %d", network_id)
1686    try:
1687        event = ad.ed.pop_event(wifi_constants.CONNECT_BY_NETID_SUCCESS, 60)
1688        connect_result = _wait_for_connect_event(ad,
1689                                                 id=network_id,
1690                                                 tries=num_of_tries)
1691        asserts.assert_true(
1692            connect_result,
1693            "Failed to connect to Wi-Fi network using network id")
1694        ad.log.debug("Wi-Fi connection result: %s", connect_result)
1695        actual_id = connect_result['data'][WifiEnums.NETID_KEY]
1696        asserts.assert_equal(
1697            actual_id, network_id, "Connected to the wrong network on %s."
1698            "Expected network id = %d, but got %d." %
1699            (ad.serial, network_id, actual_id))
1700        expected_ssid = connect_result['data'][WifiEnums.SSID_KEY]
1701        ad.log.info("Connected to Wi-Fi network %s with %d network id.",
1702                    expected_ssid, network_id)
1703
1704        internet = validate_connection(ad, DEFAULT_PING_ADDR)
1705        if not internet:
1706            raise signals.TestFailure("Failed to connect to internet on %s" %
1707                                      expected_ssid)
1708    except Empty:
1709        asserts.fail("Failed to connect to network with id %d on %s" %
1710                     (network_id, ad.serial))
1711    except Exception as error:
1712        ad.log.error("Failed to connect to network with id %d with error %s",
1713                     network_id, error)
1714        raise signals.TestFailure("Failed to connect to network with network"
1715                                  " id %d" % network_id)
1716    finally:
1717        ad.droid.wifiStopTrackingStateChange()
1718
1719
1720def wifi_connect_using_network_request(ad,
1721                                       network,
1722                                       network_specifier,
1723                                       num_of_tries=3):
1724    """Connect an Android device to a wifi network using network request.
1725
1726    Trigger a network request with the provided network specifier,
1727    wait for the "onMatch" event, ensure that the scan results in "onMatch"
1728    event contain the specified network, then simulate the user granting the
1729    request with the specified network selected. Then wait for the "onAvailable"
1730    network callback indicating successful connection to network.
1731
1732    Args:
1733        ad: android_device object to initiate connection on.
1734        network_specifier: A dictionary representing the network specifier to
1735                           use.
1736        network: A dictionary representing the network to connect to. The
1737                 dictionary must have the key "SSID".
1738        num_of_tries: An integer that is the number of times to try before
1739                      delaring failure.
1740    Returns:
1741        key: Key corresponding to network request.
1742    """
1743    key = ad.droid.connectivityRequestWifiNetwork(network_specifier, 0)
1744    ad.log.info("Sent network request %s with %s " % (key, network_specifier))
1745    # Need a delay here because UI interaction should only start once wifi
1746    # starts processing the request.
1747    time.sleep(wifi_constants.NETWORK_REQUEST_CB_REGISTER_DELAY_SEC)
1748    _wait_for_wifi_connect_after_network_request(ad, network, key,
1749                                                 num_of_tries)
1750    return key
1751
1752
1753def wait_for_wifi_connect_after_network_request(ad,
1754                                                network,
1755                                                key,
1756                                                num_of_tries=3,
1757                                                assert_on_fail=True):
1758    """
1759    Simulate and verify the connection flow after initiating the network
1760    request.
1761
1762    Wait for the "onMatch" event, ensure that the scan results in "onMatch"
1763    event contain the specified network, then simulate the user granting the
1764    request with the specified network selected. Then wait for the "onAvailable"
1765    network callback indicating successful connection to network.
1766
1767    Args:
1768        ad: android_device object to initiate connection on.
1769        network: A dictionary representing the network to connect to. The
1770                 dictionary must have the key "SSID".
1771        key: Key corresponding to network request.
1772        num_of_tries: An integer that is the number of times to try before
1773                      delaring failure.
1774        assert_on_fail: If True, error checks in this function will raise test
1775                        failure signals.
1776
1777    Returns:
1778        Returns a value only if assert_on_fail is false.
1779        Returns True if the connection was successful, False otherwise.
1780    """
1781    _assert_on_fail_handler(_wait_for_wifi_connect_after_network_request,
1782                            assert_on_fail, ad, network, key, num_of_tries)
1783
1784
1785def _wait_for_wifi_connect_after_network_request(ad,
1786                                                 network,
1787                                                 key,
1788                                                 num_of_tries=3):
1789    """
1790    Simulate and verify the connection flow after initiating the network
1791    request.
1792
1793    Wait for the "onMatch" event, ensure that the scan results in "onMatch"
1794    event contain the specified network, then simulate the user granting the
1795    request with the specified network selected. Then wait for the "onAvailable"
1796    network callback indicating successful connection to network.
1797
1798    Args:
1799        ad: android_device object to initiate connection on.
1800        network: A dictionary representing the network to connect to. The
1801                 dictionary must have the key "SSID".
1802        key: Key corresponding to network request.
1803        num_of_tries: An integer that is the number of times to try before
1804                      delaring failure.
1805    """
1806    asserts.assert_true(
1807        WifiEnums.SSID_KEY in network,
1808        "Key '%s' must be present in network definition." % WifiEnums.SSID_KEY)
1809    ad.droid.wifiStartTrackingStateChange()
1810    expected_ssid = network[WifiEnums.SSID_KEY]
1811    ad.droid.wifiRegisterNetworkRequestMatchCallback()
1812    # Wait for the platform to scan and return a list of networks
1813    # matching the request
1814    try:
1815        matched_network = None
1816        for _ in [0, num_of_tries]:
1817            on_match_event = ad.ed.pop_event(
1818                wifi_constants.WIFI_NETWORK_REQUEST_MATCH_CB_ON_MATCH, 60)
1819            asserts.assert_true(on_match_event,
1820                                "Network request on match not received.")
1821            matched_scan_results = on_match_event["data"]
1822            ad.log.debug("Network request on match results %s",
1823                         matched_scan_results)
1824            matched_network = match_networks(
1825                {WifiEnums.SSID_KEY: network[WifiEnums.SSID_KEY]},
1826                matched_scan_results)
1827            ad.log.debug("Network request on match %s", matched_network)
1828            if matched_network:
1829                break
1830
1831        asserts.assert_true(matched_network,
1832                            "Target network %s not found" % network)
1833
1834        ad.droid.wifiSendUserSelectionForNetworkRequestMatch(network)
1835        ad.log.info("Sent user selection for network request %s",
1836                    expected_ssid)
1837
1838        # Wait for the platform to connect to the network.
1839        autils.wait_for_event_with_keys(
1840            ad, cconsts.EVENT_NETWORK_CALLBACK, 60,
1841            (cconsts.NETWORK_CB_KEY_ID, key),
1842            (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_AVAILABLE))
1843        on_capabilities_changed = autils.wait_for_event_with_keys(
1844            ad, cconsts.EVENT_NETWORK_CALLBACK, 10,
1845            (cconsts.NETWORK_CB_KEY_ID, key),
1846            (cconsts.NETWORK_CB_KEY_EVENT,
1847             cconsts.NETWORK_CB_CAPABILITIES_CHANGED))
1848        connected_network = None
1849        # WifiInfo is attached to TransportInfo only in S.
1850        if ad.droid.isSdkAtLeastS():
1851            connected_network = (
1852                on_capabilities_changed["data"][
1853                    cconsts.NETWORK_CB_KEY_TRANSPORT_INFO]
1854            )
1855        else:
1856            connected_network = ad.droid.wifiGetConnectionInfo()
1857        ad.log.info("Connected to network %s", connected_network)
1858        asserts.assert_equal(
1859            connected_network[WifiEnums.SSID_KEY], expected_ssid,
1860            "Connected to the wrong network."
1861            "Expected %s, but got %s." % (network, connected_network))
1862    except Empty:
1863        asserts.fail("Failed to connect to %s" % expected_ssid)
1864    except Exception as error:
1865        ad.log.error("Failed to connect to %s with error %s" %
1866                     (expected_ssid, error))
1867        raise signals.TestFailure("Failed to connect to %s network" % network)
1868    finally:
1869        ad.droid.wifiStopTrackingStateChange()
1870
1871
1872def wifi_passpoint_connect(ad,
1873                           passpoint_network,
1874                           num_of_tries=1,
1875                           assert_on_fail=True):
1876    """Connect an Android device to a wifi network.
1877
1878    Initiate connection to a wifi network, wait for the "connected" event, then
1879    confirm the connected ssid is the one requested.
1880
1881    This will directly fail a test if anything goes wrong.
1882
1883    Args:
1884        ad: android_device object to initiate connection on.
1885        passpoint_network: SSID of the Passpoint network to connect to.
1886        num_of_tries: An integer that is the number of times to try before
1887                      delaring failure. Default is 1.
1888        assert_on_fail: If True, error checks in this function will raise test
1889                        failure signals.
1890
1891    Returns:
1892        If assert_on_fail is False, function returns network id, if the connect was
1893        successful, False otherwise. If assert_on_fail is True, no return value.
1894    """
1895    _assert_on_fail_handler(_wifi_passpoint_connect,
1896                            assert_on_fail,
1897                            ad,
1898                            passpoint_network,
1899                            num_of_tries=num_of_tries)
1900
1901
1902def _wifi_passpoint_connect(ad, passpoint_network, num_of_tries=1):
1903    """Connect an Android device to a wifi network.
1904
1905    Initiate connection to a wifi network, wait for the "connected" event, then
1906    confirm the connected ssid is the one requested.
1907
1908    This will directly fail a test if anything goes wrong.
1909
1910    Args:
1911        ad: android_device object to initiate connection on.
1912        passpoint_network: SSID of the Passpoint network to connect to.
1913        num_of_tries: An integer that is the number of times to try before
1914                      delaring failure. Default is 1.
1915    """
1916    ad.droid.wifiStartTrackingStateChange()
1917    expected_ssid = passpoint_network
1918    ad.log.info("Starting connection process to passpoint %s", expected_ssid)
1919
1920    try:
1921        connect_result = _wait_for_connect_event(ad, expected_ssid,
1922                                                 num_of_tries)
1923        asserts.assert_true(
1924            connect_result, "Failed to connect to WiFi passpoint network %s on"
1925            " %s" % (expected_ssid, ad.serial))
1926        ad.log.info("Wi-Fi connection result: %s.", connect_result)
1927        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
1928        asserts.assert_equal(
1929            actual_ssid, expected_ssid,
1930            "Connected to the wrong network on %s." % ad.serial)
1931        ad.log.info("Connected to Wi-Fi passpoint network %s.", actual_ssid)
1932
1933        internet = validate_connection(ad, DEFAULT_PING_ADDR)
1934        if not internet:
1935            raise signals.TestFailure("Failed to connect to internet on %s" %
1936                                      expected_ssid)
1937    except Exception as error:
1938        ad.log.error("Failed to connect to passpoint network %s with error %s",
1939                     expected_ssid, error)
1940        raise signals.TestFailure("Failed to connect to %s passpoint network" %
1941                                  expected_ssid)
1942
1943    finally:
1944        ad.droid.wifiStopTrackingStateChange()
1945
1946
1947def delete_passpoint(ad, fqdn):
1948    """Delete a required Passpoint configuration."""
1949    try:
1950        ad.droid.removePasspointConfig(fqdn)
1951        return True
1952    except Exception as error:
1953        ad.log.error(
1954            "Failed to remove passpoint configuration with FQDN=%s "
1955            "and error=%s", fqdn, error)
1956        return False
1957
1958
1959def start_wifi_single_scan(ad, scan_setting):
1960    """Starts wifi single shot scan.
1961
1962    Args:
1963        ad: android_device object to initiate connection on.
1964        scan_setting: A dict representing the settings of the scan.
1965
1966    Returns:
1967        If scan was started successfully, event data of success event is returned.
1968    """
1969    idx = ad.droid.wifiScannerStartScan(scan_setting)
1970    event = ad.ed.pop_event("WifiScannerScan%sonSuccess" % idx, SHORT_TIMEOUT)
1971    ad.log.debug("Got event %s", event)
1972    return event['data']
1973
1974
1975def track_connection(ad, network_ssid, check_connection_count):
1976    """Track wifi connection to network changes for given number of counts
1977
1978    Args:
1979        ad: android_device object for forget network.
1980        network_ssid: network ssid to which connection would be tracked
1981        check_connection_count: Integer for maximum number network connection
1982                                check.
1983    Returns:
1984        True if connection to given network happen, else return False.
1985    """
1986    ad.droid.wifiStartTrackingStateChange()
1987    while check_connection_count > 0:
1988        connect_network = ad.ed.pop_event("WifiNetworkConnected", 120)
1989        ad.log.info("Connected to network %s", connect_network)
1990        if (WifiEnums.SSID_KEY in connect_network['data'] and
1991                connect_network['data'][WifiEnums.SSID_KEY] == network_ssid):
1992            return True
1993        check_connection_count -= 1
1994    ad.droid.wifiStopTrackingStateChange()
1995    return False
1996
1997
1998def get_scan_time_and_channels(wifi_chs, scan_setting, stime_channel):
1999    """Calculate the scan time required based on the band or channels in scan
2000    setting
2001
2002    Args:
2003        wifi_chs: Object of channels supported
2004        scan_setting: scan setting used for start scan
2005        stime_channel: scan time per channel
2006
2007    Returns:
2008        scan_time: time required for completing a scan
2009        scan_channels: channel used for scanning
2010    """
2011    scan_time = 0
2012    scan_channels = []
2013    if "band" in scan_setting and "channels" not in scan_setting:
2014        scan_channels = wifi_chs.band_to_freq(scan_setting["band"])
2015    elif "channels" in scan_setting and "band" not in scan_setting:
2016        scan_channels = scan_setting["channels"]
2017    scan_time = len(scan_channels) * stime_channel
2018    for channel in scan_channels:
2019        if channel in WifiEnums.DFS_5G_FREQUENCIES:
2020            scan_time += 132  #passive scan time on DFS
2021    return scan_time, scan_channels
2022
2023
2024def start_wifi_track_bssid(ad, track_setting):
2025    """Start tracking Bssid for the given settings.
2026
2027    Args:
2028      ad: android_device object.
2029      track_setting: Setting for which the bssid tracking should be started
2030
2031    Returns:
2032      If tracking started successfully, event data of success event is returned.
2033    """
2034    idx = ad.droid.wifiScannerStartTrackingBssids(
2035        track_setting["bssidInfos"], track_setting["apLostThreshold"])
2036    event = ad.ed.pop_event("WifiScannerBssid{}onSuccess".format(idx),
2037                            SHORT_TIMEOUT)
2038    return event['data']
2039
2040
2041def convert_pem_key_to_pkcs8(in_file, out_file):
2042    """Converts the key file generated by us to the format required by
2043    Android using openssl.
2044
2045    The input file must have the extension "pem". The output file must
2046    have the extension "der".
2047
2048    Args:
2049        in_file: The original key file.
2050        out_file: The full path to the converted key file, including
2051        filename.
2052    """
2053    asserts.assert_true(in_file.endswith(".pem"), "Input file has to be .pem.")
2054    asserts.assert_true(out_file.endswith(".der"),
2055                        "Output file has to be .der.")
2056    cmd = ("openssl pkcs8 -inform PEM -in {} -outform DER -out {} -nocrypt"
2057           " -topk8").format(in_file, out_file)
2058    utils.exe_cmd(cmd)
2059
2060
2061def validate_connection(ad,
2062                        ping_addr=DEFAULT_PING_ADDR,
2063                        wait_time=15,
2064                        ping_gateway=True):
2065    """Validate internet connection by pinging the address provided.
2066
2067    Args:
2068        ad: android_device object.
2069        ping_addr: address on internet for pinging.
2070        wait_time: wait for some time before validating connection
2071
2072    Returns:
2073        ping output if successful, NULL otherwise.
2074    """
2075    android_version = int(ad.adb.shell("getprop ro.vendor.build.version.release"))
2076    # wait_time to allow for DHCP to complete.
2077    for i in range(wait_time):
2078        if ad.droid.connectivityNetworkIsConnected():
2079            if (android_version > 10 and ad.droid.connectivityGetIPv4DefaultGateway()) or android_version < 11:
2080                break
2081        time.sleep(1)
2082    ping = False
2083    try:
2084        ping = ad.droid.httpPing(ping_addr)
2085        ad.log.info("Http ping result: %s.", ping)
2086    except:
2087        pass
2088    if android_version > 10 and not ping and ping_gateway:
2089        ad.log.info("Http ping failed. Pinging default gateway")
2090        gw = ad.droid.connectivityGetIPv4DefaultGateway()
2091        result = ad.adb.shell("ping -c 6 {}".format(gw))
2092        ad.log.info("Default gateway ping result: %s" % result)
2093        ping = False if "100% packet loss" in result else True
2094    return ping
2095
2096
2097#TODO(angli): This can only verify if an actual value is exactly the same.
2098# Would be nice to be able to verify an actual value is one of serveral.
2099def verify_wifi_connection_info(ad, expected_con):
2100    """Verifies that the information of the currently connected wifi network is
2101    as expected.
2102
2103    Args:
2104        expected_con: A dict representing expected key-value pairs for wifi
2105            connection. e.g. {"SSID": "test_wifi"}
2106    """
2107    current_con = ad.droid.wifiGetConnectionInfo()
2108    case_insensitive = ["BSSID", "supplicant_state"]
2109    ad.log.debug("Current connection: %s", current_con)
2110    for k, expected_v in expected_con.items():
2111        # Do not verify authentication related fields.
2112        if k == "password":
2113            continue
2114        msg = "Field %s does not exist in wifi connection info %s." % (
2115            k, current_con)
2116        if k not in current_con:
2117            raise signals.TestFailure(msg)
2118        actual_v = current_con[k]
2119        if k in case_insensitive:
2120            actual_v = actual_v.lower()
2121            expected_v = expected_v.lower()
2122        msg = "Expected %s to be %s, actual %s is %s." % (k, expected_v, k,
2123                                                          actual_v)
2124        if actual_v != expected_v:
2125            raise signals.TestFailure(msg)
2126
2127
2128def check_autoconnect_to_open_network(
2129        ad, conn_timeout=WIFI_CONNECTION_TIMEOUT_DEFAULT):
2130    """Connects to any open WiFI AP
2131     Args:
2132         timeout value in sec to wait for UE to connect to a WiFi AP
2133     Returns:
2134         True if UE connects to WiFi AP (supplicant_state = completed)
2135         False if UE fails to complete connection within WIFI_CONNECTION_TIMEOUT time.
2136    """
2137    if ad.droid.wifiCheckState():
2138        return True
2139    ad.droid.wifiToggleState()
2140    wifi_connection_state = None
2141    timeout = time.time() + conn_timeout
2142    while wifi_connection_state != "completed":
2143        wifi_connection_state = ad.droid.wifiGetConnectionInfo(
2144        )['supplicant_state']
2145        if time.time() > timeout:
2146            ad.log.warning("Failed to connect to WiFi AP")
2147            return False
2148    return True
2149
2150
2151def expand_enterprise_config_by_phase2(config):
2152    """Take an enterprise config and generate a list of configs, each with
2153    a different phase2 auth type.
2154
2155    Args:
2156        config: A dict representing enterprise config.
2157
2158    Returns
2159        A list of enterprise configs.
2160    """
2161    results = []
2162    phase2_types = WifiEnums.EapPhase2
2163    if config[WifiEnums.Enterprise.EAP] == WifiEnums.Eap.PEAP:
2164        # Skip unsupported phase2 types for PEAP.
2165        phase2_types = [WifiEnums.EapPhase2.GTC, WifiEnums.EapPhase2.MSCHAPV2]
2166    for phase2_type in phase2_types:
2167        # Skip a special case for passpoint TTLS.
2168        if (WifiEnums.Enterprise.FQDN in config
2169                and phase2_type == WifiEnums.EapPhase2.GTC):
2170            continue
2171        c = dict(config)
2172        c[WifiEnums.Enterprise.PHASE2] = phase2_type.value
2173        results.append(c)
2174    return results
2175
2176
2177def generate_eap_test_name(config, ad=None):
2178    """ Generates a test case name based on an EAP configuration.
2179
2180    Args:
2181        config: A dict representing an EAP credential.
2182        ad object: Redundant but required as the same param is passed
2183                   to test_func in run_generated_tests
2184
2185    Returns:
2186        A string representing the name of a generated EAP test case.
2187    """
2188    eap = WifiEnums.Eap
2189    eap_phase2 = WifiEnums.EapPhase2
2190    Ent = WifiEnums.Enterprise
2191    name = "test_connect-"
2192    eap_name = ""
2193    for e in eap:
2194        if e.value == config[Ent.EAP]:
2195            eap_name = e.name
2196            break
2197    if "peap0" in config[WifiEnums.SSID_KEY].lower():
2198        eap_name = "PEAP0"
2199    if "peap1" in config[WifiEnums.SSID_KEY].lower():
2200        eap_name = "PEAP1"
2201    name += eap_name
2202    if Ent.PHASE2 in config:
2203        for e in eap_phase2:
2204            if e.value == config[Ent.PHASE2]:
2205                name += "-{}".format(e.name)
2206                break
2207    return name
2208
2209
2210def group_attenuators(attenuators):
2211    """Groups a list of attenuators into attenuator groups for backward
2212    compatibility reasons.
2213
2214    Most legacy Wi-Fi setups have two attenuators each connected to a separate
2215    AP. The new Wi-Fi setup has four attenuators, each connected to one channel
2216    on an AP, so two of them are connected to one AP.
2217
2218    To make the existing scripts work in the new setup, when the script needs
2219    to attenuate one AP, it needs to set attenuation on both attenuators
2220    connected to the same AP.
2221
2222    This function groups attenuators properly so the scripts work in both
2223    legacy and new Wi-Fi setups.
2224
2225    Args:
2226        attenuators: A list of attenuator objects, either two or four in length.
2227
2228    Raises:
2229        signals.TestFailure is raised if the attenuator list does not have two
2230        or four objects.
2231    """
2232    attn0 = attenuator.AttenuatorGroup("AP0")
2233    attn1 = attenuator.AttenuatorGroup("AP1")
2234    # Legacy testbed setup has two attenuation channels.
2235    num_of_attns = len(attenuators)
2236    if num_of_attns == 2:
2237        attn0.add(attenuators[0])
2238        attn1.add(attenuators[1])
2239    elif num_of_attns == 4:
2240        attn0.add(attenuators[0])
2241        attn0.add(attenuators[1])
2242        attn1.add(attenuators[2])
2243        attn1.add(attenuators[3])
2244    else:
2245        asserts.fail(("Either two or four attenuators are required for this "
2246                      "test, but found %s") % num_of_attns)
2247    return [attn0, attn1]
2248
2249
2250def set_attns(attenuator, attn_val_name, roaming_attn=ROAMING_ATTN):
2251    """Sets attenuation values on attenuators used in this test.
2252
2253    Args:
2254        attenuator: The attenuator object.
2255        attn_val_name: Name of the attenuation value pair to use.
2256        roaming_attn: Dictionary specifying the attenuation params.
2257    """
2258    logging.info("Set attenuation values to %s", roaming_attn[attn_val_name])
2259    try:
2260        attenuator[0].set_atten(roaming_attn[attn_val_name][0])
2261        attenuator[1].set_atten(roaming_attn[attn_val_name][1])
2262        attenuator[2].set_atten(roaming_attn[attn_val_name][2])
2263        attenuator[3].set_atten(roaming_attn[attn_val_name][3])
2264    except:
2265        logging.exception("Failed to set attenuation values %s.",
2266                          attn_val_name)
2267        raise
2268
2269
2270def set_attns_steps(attenuators,
2271                    atten_val_name,
2272                    roaming_attn=ROAMING_ATTN,
2273                    steps=10,
2274                    wait_time=12):
2275    """Set attenuation values on attenuators used in this test. It will change
2276    the attenuation values linearly from current value to target value step by
2277    step.
2278
2279    Args:
2280        attenuators: The list of attenuator objects that you want to change
2281                     their attenuation value.
2282        atten_val_name: Name of the attenuation value pair to use.
2283        roaming_attn: Dictionary specifying the attenuation params.
2284        steps: Number of attenuator changes to reach the target value.
2285        wait_time: Sleep time for each change of attenuator.
2286    """
2287    logging.info("Set attenuation values to %s in %d step(s)",
2288                 roaming_attn[atten_val_name], steps)
2289    start_atten = [attenuator.get_atten() for attenuator in attenuators]
2290    target_atten = roaming_attn[atten_val_name]
2291    for current_step in range(steps):
2292        progress = (current_step + 1) / steps
2293        for i, attenuator in enumerate(attenuators):
2294            amount_since_start = (target_atten[i] - start_atten[i]) * progress
2295            attenuator.set_atten(round(start_atten[i] + amount_since_start))
2296        time.sleep(wait_time)
2297
2298
2299def trigger_roaming_and_validate(dut,
2300                                 attenuator,
2301                                 attn_val_name,
2302                                 expected_con,
2303                                 roaming_attn=ROAMING_ATTN):
2304    """Sets attenuators to trigger roaming and validate the DUT connected
2305    to the BSSID expected.
2306
2307    Args:
2308        attenuator: The attenuator object.
2309        attn_val_name: Name of the attenuation value pair to use.
2310        expected_con: The network information of the expected network.
2311        roaming_attn: Dictionary specifying the attenaution params.
2312    """
2313    expected_con = {
2314        WifiEnums.SSID_KEY: expected_con[WifiEnums.SSID_KEY],
2315        WifiEnums.BSSID_KEY: expected_con["bssid"],
2316    }
2317    set_attns_steps(attenuator, attn_val_name, roaming_attn)
2318
2319    verify_wifi_connection_info(dut, expected_con)
2320    expected_bssid = expected_con[WifiEnums.BSSID_KEY]
2321    logging.info("Roamed to %s successfully", expected_bssid)
2322    if not validate_connection(dut):
2323        raise signals.TestFailure("Fail to connect to internet on %s" %
2324                                  expected_bssid)
2325
2326
2327def create_softap_config():
2328    """Create a softap config with random ssid and password."""
2329    ap_ssid = "softap_" + utils.rand_ascii_str(8)
2330    ap_password = utils.rand_ascii_str(8)
2331    logging.info("softap setup: %s %s", ap_ssid, ap_password)
2332    config = {
2333        WifiEnums.SSID_KEY: ap_ssid,
2334        WifiEnums.PWD_KEY: ap_password,
2335    }
2336    return config
2337
2338
2339def start_softap_and_verify(ad, band):
2340    """Bring-up softap and verify AP mode and in scan results.
2341
2342    Args:
2343        band: The band to use for softAP.
2344
2345    Returns: dict, the softAP config.
2346
2347    """
2348    # Register before start the test.
2349    callbackId = ad.dut.droid.registerSoftApCallback()
2350    # Check softap info value is default
2351    frequency, bandwdith = get_current_softap_info(ad.dut, callbackId, True)
2352    asserts.assert_true(frequency == 0, "Softap frequency is not reset")
2353    asserts.assert_true(bandwdith == 0, "Softap bandwdith is not reset")
2354
2355    config = create_softap_config()
2356    start_wifi_tethering(ad.dut,
2357                         config[WifiEnums.SSID_KEY],
2358                         config[WifiEnums.PWD_KEY],
2359                         band=band)
2360    asserts.assert_true(ad.dut.droid.wifiIsApEnabled(),
2361                        "SoftAp is not reported as running")
2362    start_wifi_connection_scan_and_ensure_network_found(
2363        ad.dut_client, config[WifiEnums.SSID_KEY])
2364
2365    # Check softap info can get from callback succeed and assert value should be
2366    # valid.
2367    frequency, bandwdith = get_current_softap_info(ad.dut, callbackId, True)
2368    asserts.assert_true(frequency > 0, "Softap frequency is not valid")
2369    asserts.assert_true(bandwdith > 0, "Softap bandwdith is not valid")
2370    # Unregister callback
2371    ad.dut.droid.unregisterSoftApCallback(callbackId)
2372
2373    return config
2374
2375
2376def wait_for_expected_number_of_softap_clients(ad, callbackId,
2377                                               expected_num_of_softap_clients):
2378    """Wait for the number of softap clients to be updated as expected.
2379    Args:
2380        callbackId: Id of the callback associated with registering.
2381        expected_num_of_softap_clients: expected number of softap clients.
2382    """
2383    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
2384        callbackId) + wifi_constants.SOFTAP_NUMBER_CLIENTS_CHANGED
2385    clientData = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)['data']
2386    clientCount = clientData[wifi_constants.SOFTAP_NUMBER_CLIENTS_CALLBACK_KEY]
2387    clientMacAddresses = clientData[
2388        wifi_constants.SOFTAP_CLIENTS_MACS_CALLBACK_KEY]
2389    asserts.assert_equal(
2390        clientCount, expected_num_of_softap_clients,
2391        "The number of softap clients doesn't match the expected number")
2392    asserts.assert_equal(
2393        len(clientMacAddresses), expected_num_of_softap_clients,
2394        "The number of mac addresses doesn't match the expected number")
2395    for macAddress in clientMacAddresses:
2396        asserts.assert_true(checkMacAddress(macAddress),
2397                            "An invalid mac address was returned")
2398
2399
2400def checkMacAddress(input):
2401    """Validate whether a string is a valid mac address or not.
2402
2403    Args:
2404        input: The string to validate.
2405
2406    Returns: True/False, returns true for a valid mac address and false otherwise.
2407    """
2408    macValidationRegex = "[0-9a-f]{2}([-:]?)[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$"
2409    if re.match(macValidationRegex, input.lower()):
2410        return True
2411    return False
2412
2413
2414def wait_for_expected_softap_state(ad, callbackId, expected_softap_state):
2415    """Wait for the expected softap state change.
2416    Args:
2417        callbackId: Id of the callback associated with registering.
2418        expected_softap_state: The expected softap state.
2419    """
2420    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
2421        callbackId) + wifi_constants.SOFTAP_STATE_CHANGED
2422    asserts.assert_equal(
2423        ad.ed.pop_event(eventStr, SHORT_TIMEOUT)['data'][
2424            wifi_constants.SOFTAP_STATE_CHANGE_CALLBACK_KEY],
2425        expected_softap_state,
2426        "Softap state doesn't match with expected state")
2427
2428
2429def get_current_number_of_softap_clients(ad, callbackId):
2430    """pop up all of softap client updated event from queue.
2431    Args:
2432        callbackId: Id of the callback associated with registering.
2433
2434    Returns:
2435        If exist aleast callback, returns last updated number_of_softap_clients.
2436        Returns None when no any match callback event in queue.
2437    """
2438    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
2439        callbackId) + wifi_constants.SOFTAP_NUMBER_CLIENTS_CHANGED
2440    events = ad.ed.pop_all(eventStr)
2441    for event in events:
2442        num_of_clients = event['data'][
2443            wifi_constants.SOFTAP_NUMBER_CLIENTS_CALLBACK_KEY]
2444    if len(events) == 0:
2445        return None
2446    return num_of_clients
2447
2448
2449def get_current_softap_info(ad, callbackId, need_to_wait):
2450    """pop up all of softap info changed event from queue.
2451    Args:
2452        callbackId: Id of the callback associated with registering.
2453        need_to_wait: Wait for the info callback event before pop all.
2454    Returns:
2455        Returns last updated information of softap.
2456    """
2457    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
2458        callbackId) + wifi_constants.SOFTAP_INFO_CHANGED
2459    ad.log.debug("softap info dump from eventStr %s", eventStr)
2460    frequency = 0
2461    bandwidth = 0
2462    if (need_to_wait):
2463        event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)
2464        frequency = event['data'][
2465            wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY]
2466        bandwidth = event['data'][
2467            wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY]
2468        ad.log.info("softap info updated, frequency is %s, bandwidth is %s",
2469                    frequency, bandwidth)
2470
2471    events = ad.ed.pop_all(eventStr)
2472    for event in events:
2473        frequency = event['data'][
2474            wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY]
2475        bandwidth = event['data'][
2476            wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY]
2477    ad.log.info("softap info, frequency is %s, bandwidth is %s", frequency,
2478                bandwidth)
2479    return frequency, bandwidth
2480
2481def get_current_softap_infos(ad, callbackId, need_to_wait):
2482    """pop up all of softap info list changed event from queue.
2483    Args:
2484        callbackId: Id of the callback associated with registering.
2485        need_to_wait: Wait for the info callback event before pop all.
2486    Returns:
2487        Returns last updated informations of softap.
2488    """
2489    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
2490        callbackId) + wifi_constants.SOFTAP_INFOLIST_CHANGED
2491    ad.log.debug("softap info dump from eventStr %s", eventStr)
2492
2493    if (need_to_wait):
2494        event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)
2495        infos = event['data']
2496
2497    events = ad.ed.pop_all(eventStr)
2498    for event in events:
2499        infos = event['data']
2500
2501    for info in infos:
2502        frequency = info[
2503            wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY]
2504        bandwidth = info[
2505            wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY]
2506        wifistandard = info[
2507            wifi_constants.SOFTAP_INFO_WIFISTANDARD_CALLBACK_KEY]
2508        bssid = info[
2509            wifi_constants.SOFTAP_INFO_BSSID_CALLBACK_KEY]
2510        ad.log.info(
2511                "softap info, freq:%s, bw:%s, wifistandard:%s, bssid:%s",
2512                frequency, bandwidth, wifistandard, bssid)
2513
2514    return infos
2515
2516def get_current_softap_capability(ad, callbackId, need_to_wait):
2517    """pop up all of softap info list changed event from queue.
2518    Args:
2519        callbackId: Id of the callback associated with registering.
2520        need_to_wait: Wait for the info callback event before pop all.
2521    Returns:
2522        Returns last updated capability of softap.
2523    """
2524    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
2525            callbackId) + wifi_constants.SOFTAP_CAPABILITY_CHANGED
2526    ad.log.debug("softap capability dump from eventStr %s", eventStr)
2527    if (need_to_wait):
2528        event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)
2529        capability = event['data']
2530
2531    events = ad.ed.pop_all(eventStr)
2532    for event in events:
2533        capability = event['data']
2534
2535    return capability
2536
2537def get_ssrdumps(ad):
2538    """Pulls dumps in the ssrdump dir
2539    Args:
2540        ad: android device object.
2541    """
2542    logs = ad.get_file_names("/data/vendor/ssrdump/")
2543    if logs:
2544        ad.log.info("Pulling ssrdumps %s", logs)
2545        log_path = os.path.join(ad.device_log_path, "SSRDUMPS_%s" % ad.serial)
2546        os.makedirs(log_path, exist_ok=True)
2547        ad.pull_files(logs, log_path)
2548    ad.adb.shell("find /data/vendor/ssrdump/ -type f -delete",
2549                 ignore_status=True)
2550
2551
2552def start_pcap(pcap, wifi_band, test_name):
2553    """Start packet capture in monitor mode.
2554
2555    Args:
2556        pcap: packet capture object
2557        wifi_band: '2g' or '5g' or 'dual'
2558        test_name: test name to be used for pcap file name
2559
2560    Returns:
2561        Dictionary with wifi band as key and the tuple
2562        (pcap Process object, log directory) as the value
2563    """
2564    log_dir = os.path.join(
2565        context.get_current_context().get_full_output_path(), 'PacketCapture')
2566    os.makedirs(log_dir, exist_ok=True)
2567    if wifi_band == 'dual':
2568        bands = [BAND_2G, BAND_5G]
2569    else:
2570        bands = [wifi_band]
2571    procs = {}
2572    for band in bands:
2573        proc = pcap.start_packet_capture(band, log_dir, test_name)
2574        procs[band] = (proc, os.path.join(log_dir, test_name))
2575    return procs
2576
2577
2578def stop_pcap(pcap, procs, test_status=None):
2579    """Stop packet capture in monitor mode.
2580
2581    Since, the pcap logs in monitor mode can be very large, we will
2582    delete them if they are not required. 'test_status' if True, will delete
2583    the pcap files. If False, we will keep them.
2584
2585    Args:
2586        pcap: packet capture object
2587        procs: dictionary returned by start_pcap
2588        test_status: status of the test case
2589    """
2590    for proc, fname in procs.values():
2591        pcap.stop_packet_capture(proc)
2592
2593    if test_status:
2594        shutil.rmtree(os.path.dirname(fname))
2595
2596
2597def verify_mac_not_found_in_pcap(ad, mac, packets):
2598    """Verify that a mac address is not found in the captured packets.
2599
2600    Args:
2601        ad: android device object
2602        mac: string representation of the mac address
2603        packets: packets obtained by rdpcap(pcap_fname)
2604    """
2605    for pkt in packets:
2606        logging.debug("Packet Summary = %s", pkt.summary())
2607        if mac in pkt.summary():
2608            asserts.fail("Device %s caught Factory MAC: %s in packet sniffer."
2609                         "Packet = %s" % (ad.serial, mac, pkt.show()))
2610
2611
2612def verify_mac_is_found_in_pcap(ad, mac, packets):
2613    """Verify that a mac address is found in the captured packets.
2614
2615    Args:
2616        ad: android device object
2617        mac: string representation of the mac address
2618        packets: packets obtained by rdpcap(pcap_fname)
2619    """
2620    for pkt in packets:
2621        if mac in pkt.summary():
2622            return
2623    asserts.fail("Did not find MAC = %s in packet sniffer."
2624                 "for device %s" % (mac, ad.serial))
2625
2626def start_all_wlan_logs(ads):
2627    for ad in ads:
2628        start_wlan_logs(ad)
2629
2630def start_wlan_logs(ad):
2631    """Start Pixel Logger to record extra wifi logs
2632
2633    Args:
2634        ad: android device object.
2635    """
2636    if not ad.adb.shell("pm list package | grep com.android.pixellogger"):
2637        ad.log.info("Device doesn't have Pixel Logger")
2638        return
2639
2640    ad.adb.shell(
2641            "find /sdcard/Android/data/com.android.pixellogger/files/logs"
2642            "/wlan_logs/ -type f -delete",
2643            ignore_status=True)
2644    if ad.file_exists("/vendor/bin/cnss_diag"):
2645        ad.adb.shell("am startservice -a com.android.pixellogger.service"
2646                ".logging.LoggingService.ACTION_START_LOGGING "
2647                "-e intent_logger cnss_diag", ignore_status=True)
2648    else:
2649        ad.adb.shell("am startservice -a com.android.pixellogger.service"
2650                ".logging.LoggingService.ACTION_START_LOGGING "
2651                "-e intent_logger wlan_logs", ignore_status=True)
2652
2653def stop_all_wlan_logs(ads):
2654    for ad in ads:
2655        stop_wlan_logs(ad)
2656    ad.log.info("Wait 30s for the createion of zip file for wlan logs")
2657    time.sleep(30)
2658
2659def stop_wlan_logs(ad):
2660    """Stops Pixel Logger
2661
2662    Args:
2663        ad: android device object.
2664    """
2665    if not ad.adb.shell("pm list package | grep com.android.pixellogger"):
2666        return
2667
2668    ad.adb.shell("am startservice -a com.android.pixellogger.service.logging"
2669            ".LoggingService.ACTION_STOP_LOGGING", ignore_status=True)
2670
2671def get_wlan_logs(ad):
2672    """Pull logs from Pixel Logger folder
2673    Args:
2674        ad: android device object.
2675    """
2676    logs = ad.get_file_names("/sdcard/Android/data/com.android.pixellogger/files/logs/wlan_logs")
2677    if logs:
2678        ad.log.info("Pulling Pixel Logger logs %s", logs)
2679        log_path = os.path.join(ad.device_log_path, "WLAN_LOGS_%s" % ad.serial)
2680        os.makedirs(log_path, exist_ok=True)
2681        ad.pull_files(logs, log_path)
2682
2683LinkProbeResult = namedtuple(
2684    'LinkProbeResult',
2685    ('is_success', 'stdout', 'elapsed_time', 'failure_reason'))
2686
2687
2688def send_link_probe(ad):
2689    """Sends a link probe to the currently connected AP, and returns whether the
2690    probe succeeded or not.
2691
2692    Args:
2693         ad: android device object
2694    Returns:
2695        LinkProbeResult namedtuple
2696    """
2697    stdout = ad.adb.shell('cmd wifi send-link-probe')
2698    asserts.assert_false('Error' in stdout or 'Exception' in stdout,
2699                         'Exception while sending link probe: ' + stdout)
2700
2701    is_success = False
2702    elapsed_time = None
2703    failure_reason = None
2704    if 'succeeded' in stdout:
2705        is_success = True
2706        elapsed_time = next(
2707            (int(token) for token in stdout.split() if token.isdigit()), None)
2708    elif 'failed with reason' in stdout:
2709        failure_reason = next(
2710            (int(token) for token in stdout.split() if token.isdigit()), None)
2711    else:
2712        asserts.fail('Unexpected link probe result: ' + stdout)
2713
2714    return LinkProbeResult(is_success=is_success,
2715                           stdout=stdout,
2716                           elapsed_time=elapsed_time,
2717                           failure_reason=failure_reason)
2718
2719
2720def send_link_probes(ad, num_probes, delay_sec):
2721    """Sends a sequence of link probes to the currently connected AP, and
2722    returns whether the probes succeeded or not.
2723
2724    Args:
2725         ad: android device object
2726         num_probes: number of probes to perform
2727         delay_sec: delay time between probes, in seconds
2728    Returns:
2729        List[LinkProbeResult] one LinkProbeResults for each probe
2730    """
2731    logging.info('Sending link probes')
2732    results = []
2733    for _ in range(num_probes):
2734        # send_link_probe() will also fail the test if it sees an exception
2735        # in the stdout of the adb shell command
2736        result = send_link_probe(ad)
2737        logging.info('link probe results: ' + str(result))
2738        results.append(result)
2739        time.sleep(delay_sec)
2740
2741    return results
2742
2743
2744def ap_setup(test, index, ap, network, bandwidth=80, channel=6):
2745    """Set up the AP with provided network info.
2746
2747        Args:
2748            test: the calling test class object.
2749            index: int, index of the AP.
2750            ap: access_point object of the AP.
2751            network: dict with information of the network, including ssid,
2752                     password and bssid.
2753            bandwidth: the operation bandwidth for the AP, default 80MHz.
2754            channel: the channel number for the AP.
2755        Returns:
2756            brconfigs: the bridge interface configs
2757        """
2758    bss_settings = []
2759    ssid = network[WifiEnums.SSID_KEY]
2760    test.access_points[index].close()
2761    time.sleep(5)
2762
2763    # Configure AP as required.
2764    if "password" in network.keys():
2765        password = network["password"]
2766        security = hostapd_security.Security(security_mode="wpa",
2767                                             password=password)
2768    else:
2769        security = hostapd_security.Security(security_mode=None, password=None)
2770    config = hostapd_ap_preset.create_ap_preset(channel=channel,
2771                                                ssid=ssid,
2772                                                security=security,
2773                                                bss_settings=bss_settings,
2774                                                vht_bandwidth=bandwidth,
2775                                                profile_name='whirlwind',
2776                                                iface_wlan_2g=ap.wlan_2g,
2777                                                iface_wlan_5g=ap.wlan_5g)
2778    ap.start_ap(config)
2779    logging.info("AP started on channel {} with SSID {}".format(channel, ssid))
2780
2781
2782def turn_ap_off(test, AP):
2783    """Bring down hostapd on the Access Point.
2784    Args:
2785        test: The test class object.
2786        AP: int, indicating which AP to turn OFF.
2787    """
2788    hostapd_2g = test.access_points[AP - 1]._aps['wlan0'].hostapd
2789    if hostapd_2g.is_alive():
2790        hostapd_2g.stop()
2791        logging.debug('Turned WLAN0 AP%d off' % AP)
2792    hostapd_5g = test.access_points[AP - 1]._aps['wlan1'].hostapd
2793    if hostapd_5g.is_alive():
2794        hostapd_5g.stop()
2795        logging.debug('Turned WLAN1 AP%d off' % AP)
2796
2797
2798def turn_ap_on(test, AP):
2799    """Bring up hostapd on the Access Point.
2800    Args:
2801        test: The test class object.
2802        AP: int, indicating which AP to turn ON.
2803    """
2804    hostapd_2g = test.access_points[AP - 1]._aps['wlan0'].hostapd
2805    if not hostapd_2g.is_alive():
2806        hostapd_2g.start(hostapd_2g.config)
2807        logging.debug('Turned WLAN0 AP%d on' % AP)
2808    hostapd_5g = test.access_points[AP - 1]._aps['wlan1'].hostapd
2809    if not hostapd_5g.is_alive():
2810        hostapd_5g.start(hostapd_5g.config)
2811        logging.debug('Turned WLAN1 AP%d on' % AP)
2812
2813
2814def turn_location_off_and_scan_toggle_off(ad):
2815    """Turns off wifi location scans."""
2816    utils.set_location_service(ad, False)
2817    ad.droid.wifiScannerToggleAlwaysAvailable(False)
2818    msg = "Failed to turn off location service's scan."
2819    asserts.assert_true(not ad.droid.wifiScannerIsAlwaysAvailable(), msg)
2820
2821
2822def set_softap_channel(dut, ap_iface='wlan1', cs_count=10, channel=2462):
2823    """ Set SoftAP mode channel
2824
2825    Args:
2826        dut: android device object
2827        ap_iface: interface of SoftAP mode.
2828        cs_count: how many beacon frames before switch channel, default = 10
2829        channel: a wifi channel.
2830    """
2831    chan_switch_cmd = 'hostapd_cli -i {} chan_switch {} {}'
2832    chan_switch_cmd_show = chan_switch_cmd.format(ap_iface, cs_count, channel)
2833    dut.log.info('adb shell {}'.format(chan_switch_cmd_show))
2834    chan_switch_result = dut.adb.shell(
2835        chan_switch_cmd.format(ap_iface, cs_count, channel))
2836    if chan_switch_result == 'OK':
2837        dut.log.info('switch hotspot channel to {}'.format(channel))
2838        return chan_switch_result
2839
2840    asserts.fail("Failed to switch hotspot channel")
2841
2842def get_wlan0_link(dut):
2843    """ get wlan0 interface status"""
2844    get_wlan0 = 'wpa_cli -iwlan0 -g@android:wpa_wlan0 IFNAME=wlan0 status'
2845    out = dut.adb.shell(get_wlan0)
2846    out = dict(re.findall(r'(\S+)=(".*?"|\S+)', out))
2847    asserts.assert_true("ssid" in out,
2848                        "Client doesn't connect to any network")
2849    return out
2850
2851def verify_11ax_wifi_connection(ad, wifi6_supported_models, wifi6_ap):
2852    """Verify 11ax for wifi connection.
2853
2854    Args:
2855      ad: adndroid device object
2856      wifi6_supported_models: device supporting 11ax.
2857      wifi6_ap: if the AP supports 11ax.
2858    """
2859    if wifi6_ap and ad.model in wifi6_supported_models:
2860        logging.info("Verifying 11ax. Model: %s" % ad.model)
2861        asserts.assert_true(
2862            ad.droid.wifiGetConnectionStandard() ==
2863            wifi_constants.WIFI_STANDARD_11AX, "DUT did not connect to 11ax.")
2864
2865def verify_11ax_softap(dut, dut_client, wifi6_supported_models):
2866    """Verify 11ax SoftAp if devices support it.
2867
2868    Check if both DUT and DUT client supports 11ax, then SoftAp turns on
2869    with 11ax mode and DUT client can connect to it.
2870
2871    Args:
2872      dut: Softap device.
2873      dut_client: Client connecting to softap.
2874      wifi6_supported_models: List of device models supporting 11ax.
2875    """
2876    if dut.model in wifi6_supported_models and dut_client.model in wifi6_supported_models:
2877        logging.info(
2878            "Verifying 11ax softap. DUT model: %s, DUT Client model: %s",
2879            dut.model, dut_client.model)
2880        asserts.assert_true(
2881            dut_client.droid.wifiGetConnectionStandard() ==
2882            wifi_constants.WIFI_STANDARD_11AX,
2883            "DUT failed to start SoftAp in 11ax.")
2884
2885def check_available_channels_in_bands_2_5(dut, country_code):
2886    """Check if DUT is capable of enable BridgedAp.
2887    #TODO: Find a way to make this function flexible by taking an argument.
2888
2889    Check points:
2890        1. Check the DUT support by calling Android API.
2891        2. Check the dual SAP bands support by changing DUT to the given country_code.
2892
2893    Args:
2894        country_code: country code, e.g., 'US', 'JP'.
2895    Returns:
2896        True: If DUT is capable of enable BridgedAp.
2897        False: If DUT is not capable of enable BridgedAp.
2898    """
2899    # Check point #1
2900    is_bridged_ap_supported = dut.droid.wifiIsBridgedApConcurrencySupported()
2901    if not is_bridged_ap_supported:
2902        logging.error("DUT %s doesn't support bridged AP.", dut.model)
2903        return False
2904
2905    # Check point #2
2906    set_wifi_country_code(dut, country_code)
2907    country = dut.droid.wifiGetCountryCode()
2908    dut.log.info("DUT current country code : {}".format(country))
2909    # Wi-Fi ON and OFF to make sure country code take effet.
2910    wifi_toggle_state(dut, True)
2911    wifi_toggle_state(dut, False)
2912
2913    # Register SoftAp Callback and get SoftAp capability.
2914    callbackId = dut.droid.registerSoftApCallback()
2915    capability = get_current_softap_capability(dut, callbackId, True)
2916    dut.droid.unregisterSoftApCallback(callbackId)
2917
2918    if capability[wifi_constants.
2919                  SOFTAP_CAPABILITY_24GHZ_SUPPORTED_CHANNEL_LIST] and \
2920        capability[wifi_constants.
2921                   SOFTAP_CAPABILITY_5GHZ_SUPPORTED_CHANNEL_LIST]:
2922        return True
2923
2924    logging.error("DUT in %s doesn't support dual SAP bands (2G and 5G).", country_code)
2925    return False
2926
2927
2928@retry(tries=5, delay=2)
2929def validate_ping_between_two_clients(dut1, dut2):
2930    """Make 2 DUT ping each other.
2931
2932    Args:
2933        dut1: An AndroidDevice object.
2934        dut2: An AndroidDevice object.
2935    """
2936    # Get DUTs' IPv4 addresses.
2937    dut1_ip = ""
2938    dut2_ip = ""
2939    try:
2940        dut1_ip = dut1.droid.connectivityGetIPv4Addresses('wlan0')[0]
2941    except IndexError as e:
2942        dut1.log.info(
2943            "{} has no Wi-Fi connection, cannot get IPv4 address."
2944            .format(dut1.serial))
2945    try:
2946        dut2_ip = dut2.droid.connectivityGetIPv4Addresses('wlan0')[0]
2947    except IndexError as e:
2948        dut2.log.info(
2949            "{} has no Wi-Fi connection, cannot get IPv4 address."
2950            .format(dut2.serial))
2951    # Test fail if not able to obtain two DUT's IPv4 addresses.
2952    asserts.assert_true(dut1_ip and dut2_ip,
2953                        "Ping failed because no DUT's IPv4 address")
2954
2955    dut1.log.info("{} IPv4 addresses : {}".format(dut1.serial, dut1_ip))
2956    dut2.log.info("{} IPv4 addresses : {}".format(dut2.serial, dut2_ip))
2957
2958    # Two clients ping each other
2959    dut1.log.info("{} ping {}".format(dut1_ip, dut2_ip))
2960    asserts.assert_true(
2961        utils.adb_shell_ping(dut1, count=10, dest_ip=dut2_ip,
2962                             timeout=20),
2963        "%s ping %s failed" % (dut1.serial, dut2_ip))
2964
2965    dut2.log.info("{} ping {}".format(dut2_ip, dut1_ip))
2966    asserts.assert_true(
2967        utils.adb_shell_ping(dut2, count=10, dest_ip=dut1_ip,
2968                             timeout=20),
2969        "%s ping %s failed" % (dut2.serial, dut1_ip))
2970
2971def get_wear_wifimediator_disable_status(ad):
2972    """Gets WearWifiMediator disable status.
2973
2974    Args:
2975        ad: Android Device
2976
2977    Returns:
2978        True if WearWifiMediator is disabled, False otherwise.
2979    """
2980    status = ad.adb.shell("settings get global cw_disable_wifimediator")
2981    if status == "1":
2982        ad.log.info("WearWifiMediator is DISABLED")
2983        status = True
2984    else:
2985        ad.log.info("WearWifiMediator is ENABLED")
2986        status = False
2987    return status
2988
2989def disable_wear_wifimediator(ad, state):
2990    """Disables WearWifiMediator.
2991
2992    Args:
2993        ad: Android Device
2994        state: True to disable, False otherwise.
2995    """
2996    if state:
2997        ad.log.info("Disabling WearWifiMediator.....")
2998        ad.adb.shell("settings put global cw_disable_wifimediator 1")
2999        asserts.assert_true(get_wear_wifimediator_disable_status(ad),
3000                            "WearWifiMediator should be disabled")
3001    else:
3002        ad.log.info("Enabling WearWifiMediator.....")
3003        ad.adb.shell("settings put global cw_disable_wifimediator 0")
3004        asserts.assert_false(get_wear_wifimediator_disable_status(ad),
3005                             "WearWifiMediator should be enabled")
3006
3007def list_scan_results(ad, wait_time=15):
3008    """
3009    Initiates an Android Wi-Fi scan and retrieves the available Wi-Fi networks'.
3010
3011    Args:
3012        ad (AndroidDevice): The Android device on which the scan is performed.
3013        wait_time (int, optional):
3014          The time in seconds to wait for the scan to complete before fetching results.
3015          Default is 10 seconds.
3016    """
3017    ad.log.info("Start scan for available Wi-Fi networks...")
3018    ad.adb.shell("cmd wifi start-scan")
3019    ad.log.info("Wait %ss for scan to complete.", wait_time)
3020    time.sleep(wait_time)
3021    scan_results = ad.adb.shell("cmd wifi list-scan-results")
3022    ad.log.info("Available Wi-Fi networks: " + "\n" + scan_results + "\n")
3023
3024def kill_iperf3_server_by_port(port: str):
3025    """
3026        Kill an iperf3 server process running on the specified port.
3027
3028        Args:
3029            port: The port number on which the iperf3 server is running.
3030    """
3031    try:
3032        ps_output = subprocess.check_output(["ps", "aux"], universal_newlines=True)
3033        lines = ps_output.split('\n')
3034        for line in lines:
3035            if "iperf3" in line and str(port) in line:
3036                columns = line.split()
3037                pid = columns[1]
3038                subprocess.run(["kill", "-15", pid])
3039                logging.warning(f"iperf3 server on port {port} already in use,"
3040                              f"kill it with PID {pid}")
3041    except subprocess.CalledProcessError:
3042        logging.info("Error executing shell command with subprocess.")
3043
3044def get_host_public_ipv4_address() -> Optional[str]:
3045  """Retrieves the host's public IPv4 address using the ifconfig command.
3046
3047  This function tries to extract the host's public IPv4 address by parsing
3048  the output of the ifconfig command. It will filter out private IP addresses
3049  (e.g., 10.0.0.0/8, 172.16.0.0/12, and 192.168.0.0/16).
3050
3051  Returns:
3052    str: The public IPv4 address, if found.
3053    None: If no public IPv4 address is found or in case of errors.
3054
3055  Raises:
3056    May print errors related to executing ifconfig or parsing the IPs, but
3057    exceptions are handled and won't be raised beyond the function.
3058  """
3059  try:
3060    # Run ifconfig command and get its output
3061    output = subprocess.check_output(["ifconfig"], universal_newlines=True)
3062  except subprocess.CalledProcessError:
3063    logging.info("Error executing ifconfig command.")
3064    return None
3065  except FileNotFoundError:
3066    logging.info("ifconfig command not found.")
3067    return None
3068  except Exception as e:
3069    logging.info("An unexpected error occurred: %s", e)
3070    return None
3071
3072  # Regular expression to capture IPv4 addresses that come after the word 'inet'
3073  pattern = r'inet (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
3074
3075  # Extract all matches
3076  matches = re.findall(pattern, output)
3077
3078  # Return the first public IP address found
3079  for ip_str in matches:
3080    try:
3081      ip = ipaddress.ip_address(ip_str)
3082      if not ip.is_private:
3083        return ip_str
3084    except ValueError:
3085      logging.info("Invalid IP address format: %s", ip_str)
3086      continue
3087
3088  # Return None if no public IP is found
3089  return None
3090
3091def get_iperf_server_port():
3092  """Gets a unique port number within the Dynamic port range (49152-65535).
3093
3094  This function first determines which ports are currently in use, and then
3095  selects a random port from the dynamic range that is not in use.
3096
3097  Returns:
3098    int: An unused port number.
3099
3100  Raises:
3101    Exception: If no available port is found in the Dynamic Ports range.
3102  """
3103
3104  def get_used_ports():
3105    """Retrieve a list of ports that are currently in use on the system.
3106
3107    This function uses the 'netstat' command to determine which ports are
3108    currently in use, and then parses the output to extract the port numbers.
3109
3110    Returns:
3111        list[int]: A list of ports currently in use.
3112    """
3113    try:
3114      # Get the output from the `netstat` command.
3115      output = subprocess.check_output(['netstat', '-tuln']).decode('utf-8')
3116
3117      # Use a regex to extract port numbers from the output.
3118      port_pattern = re.compile(r'(?<=:)\d+')
3119      ports = port_pattern.findall(output)
3120
3121      # Convert the list of ports to integers and return.
3122      return list(map(int, ports))
3123    except Exception as e:
3124      logging.error(f"Error: {e}")
3125      return []
3126
3127  # Define the range for Dynamic Ports.
3128  dynamic_ports_range = list(range(49152, 65536))
3129  used_ports = get_used_ports()
3130
3131  # Randomly shuffle the ports and then find one that's not in use.
3132  random.shuffle(dynamic_ports_range)
3133
3134  for port in dynamic_ports_range:
3135    if port not in used_ports:
3136      return port
3137
3138  raise RuntimeError("No available port found in the Dynamic Ports range!")
3139