1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6 7from autotest_lib.client.common_lib import error 8from autotest_lib.client.common_lib.cros.network import ping_runner 9from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes 10from autotest_lib.server import hosts 11from autotest_lib.server import site_linux_router 12from autotest_lib.server import site_linux_system 13from autotest_lib.server.cros import dnsname_mangler 14from autotest_lib.server.cros.network import attenuator_controller 15from autotest_lib.server.cros.network import wifi_client 16 17class WiFiTestContextManager(object): 18 """A context manager for state used in WiFi autotests. 19 20 Some of the building blocks we use in WiFi tests need to be cleaned up 21 after use. For instance, we start an XMLRPC server on the client 22 which should be shut down so that the next test can start its instance. 23 It is convenient to manage this setup and teardown through a context 24 manager rather than building it into the test class logic. 25 26 """ 27 CMDLINE_ATTEN_ADDR = 'atten_addr' 28 CMDLINE_CLIENT_PACKET_CAPTURES = 'client_capture' 29 CMDLINE_CONDUCTIVE_RIG = 'conductive_rig' 30 CMDLINE_PACKET_CAPTURE_SNAPLEN = 'capture_snaplen' 31 CMDLINE_ROUTER_ADDR = 'router_addr' 32 CMDLINE_PACKET_CAPTURES = 'packet_capture' 33 CMDLINE_USE_WPA_CLI = 'use_wpa_cli' 34 35 36 @property 37 def attenuator(self): 38 """@return attenuator object (e.g. a BeagleBone).""" 39 if self._attenuator is None: 40 raise error.TestNAError('No attenuator available in this setup.') 41 42 return self._attenuator 43 44 45 @property 46 def client(self): 47 """@return WiFiClient object abstracting the DUT.""" 48 return self._client_proxy 49 50 51 @property 52 def router(self): 53 """@return router object (e.g. a LinuxCrosRouter).""" 54 return self._router 55 56 57 @property 58 def pcap_host(self): 59 """@return Dedicated packet capture host or None.""" 60 return self._pcap_host 61 62 63 @property 64 def capture_host(self): 65 """@return Dedicated pcap_host or the router itself.""" 66 return self.pcap_host or self.router 67 68 69 def __init__(self, test_name, host, cmdline_args, debug_dir): 70 """Construct a WiFiTestContextManager. 71 72 Optionally can pull addresses of the server address, router address, 73 or router port from cmdline_args. 74 75 @param test_name string descriptive name for this test. 76 @param host host object representing the DUT. 77 @param cmdline_args dict of key, value settings from command line. 78 79 """ 80 super(WiFiTestContextManager, self).__init__() 81 self._test_name = test_name 82 self._cmdline_args = cmdline_args.copy() 83 self._client_proxy = wifi_client.WiFiClient( 84 host, debug_dir, 85 self._get_bool_cmdline_value(self.CMDLINE_USE_WPA_CLI, False)) 86 self._attenuator = None 87 self._router = None 88 self._pcap_host = None 89 self._enable_client_packet_captures = False 90 self._enable_packet_captures = False 91 self._packet_capture_snaplen = None 92 93 94 def __enter__(self): 95 self.setup() 96 return self 97 98 99 def __exit__(self, exc_type, exc_value, traceback): 100 self.teardown() 101 102 103 def _get_bool_cmdline_value(self, key, default_value): 104 """Returns a bool value for the given key from the cmdline args. 105 106 @param key string cmdline args key. 107 @param default_value value to return if the key is not specified in the 108 cmdline args. 109 110 @return True/False or default_value if key is not specified in the 111 cmdline args. 112 113 """ 114 if key in self._cmdline_args: 115 value = self._cmdline_args[key].lower() 116 if value in ('1', 'true', 'yes', 'y'): 117 return True 118 else: 119 return False 120 else: 121 return default_value 122 123 124 def get_wifi_addr(self, ap_num=0): 125 """Return an IPv4 address pingable by the client on the WiFi subnet. 126 127 @param ap_num int number of AP. Only used in stumpy cells. 128 @return string IPv4 address. 129 130 """ 131 return self.router.local_server_address(ap_num) 132 133 134 def get_wifi_if(self, ap_num=0): 135 """Returns the interface name for the IP address of self.get_wifi_addr. 136 137 @param ap_num int number of AP. Only used in stumpy cells. 138 @return string interface name "e.g. wlan0". 139 140 """ 141 return self.router.get_hostapd_interface(ap_num) 142 143 144 def get_wifi_host(self): 145 """@return host object representing a pingable machine.""" 146 return self.router.host 147 148 149 def configure(self, ap_config, multi_interface=None, is_ibss=None): 150 """Configure a router with the given config. 151 152 Configures an AP according to the specified config and 153 enables whatever packet captures are appropriate. Will deconfigure 154 existing APs unless |multi_interface| is specified. 155 156 @param ap_config HostapConfig object. 157 @param multi_interface True iff having multiple configured interfaces 158 is expected for this configure call. 159 @param is_ibss True iff this is an IBSS endpoint. 160 161 """ 162 if not self.client.is_frequency_supported(ap_config.frequency): 163 raise error.TestNAError('DUT does not support frequency: %s' % 164 ap_config.frequency) 165 if ap_config.require_vht: 166 self.client.require_capabilities( 167 [site_linux_system.LinuxSystem.CAPABILITY_VHT]) 168 ap_config.security_config.install_router_credentials(self.router.host) 169 if is_ibss: 170 if multi_interface: 171 raise error.TestFail('IBSS mode does not support multiple ' 172 'interfaces.') 173 if not self.client.is_ibss_supported(): 174 raise error.TestNAError('DUT does not support IBSS mode') 175 self.router.ibss_configure(ap_config) 176 else: 177 self.router.hostap_configure(ap_config, 178 multi_interface=multi_interface) 179 if self._enable_client_packet_captures: 180 self.client.start_capture(ap_config.frequency, 181 snaplen=self._packet_capture_snaplen) 182 if self._enable_packet_captures: 183 self.capture_host.start_capture(ap_config.frequency, 184 ht_type=ap_config.ht_packet_capture_mode, 185 snaplen=self._packet_capture_snaplen) 186 187 188 def setup(self): 189 """Construct the state used in a WiFi test.""" 190 self._router = site_linux_router.build_router_proxy( 191 test_name=self._test_name, 192 client_hostname=self.client.host.hostname, 193 router_addr=self._cmdline_args.get(self.CMDLINE_ROUTER_ADDR, 194 None)) 195 ping_helper = ping_runner.PingRunner() 196 pcap_addr = dnsname_mangler.get_pcap_addr( 197 self.client.host.hostname, 198 allow_failure=True) 199 if pcap_addr and ping_helper.simple_ping(pcap_addr): 200 self._pcap_host = site_linux_system.LinuxSystem( 201 hosts.create_host(pcap_addr),'pcap') 202 # The attenuator host gives us the ability to attenuate particular 203 # antennas on the router. Most setups don't have this capability 204 # and most tests do not require it. We use this for RvR 205 # (network_WiFi_AttenuatedPerf) and some roaming tests. 206 attenuator_addr = dnsname_mangler.get_attenuator_addr( 207 self.client.host.hostname, 208 cmdline_override=self._cmdline_args.get( 209 self.CMDLINE_ATTEN_ADDR, None), 210 allow_failure=True) 211 if attenuator_addr and ping_helper.simple_ping(attenuator_addr): 212 self._attenuator = attenuator_controller.AttenuatorController( 213 attenuator_addr) 214 # Set up a clean context to conduct WiFi tests in. 215 self.client.shill.init_test_network_state() 216 if self.CMDLINE_CLIENT_PACKET_CAPTURES in self._cmdline_args: 217 self._enable_client_packet_captures = True 218 if self.CMDLINE_PACKET_CAPTURES in self._cmdline_args: 219 self._enable_packet_captures = True 220 if self.CMDLINE_PACKET_CAPTURE_SNAPLEN in self._cmdline_args: 221 self._packet_capture_snaplen = int( 222 self._cmdline_args[self.CMDLINE_PACKET_CAPTURE_SNAPLEN]) 223 self.client.conductive = self._get_bool_cmdline_value( 224 self.CMDLINE_CONDUCTIVE_RIG, None) 225 for system in (self.client, self.router): 226 system.sync_host_times() 227 228 229 def teardown(self): 230 """Teardown the state used in a WiFi test.""" 231 logging.debug('Tearing down the test context.') 232 for system in [self._attenuator, self._client_proxy, 233 self._router, self._pcap_host]: 234 if system is not None: 235 system.close() 236 237 238 def assert_connect_wifi(self, wifi_params, description=None): 239 """Connect to a WiFi network and check for success. 240 241 Connect a DUT to a WiFi network and check that we connect successfully. 242 243 @param wifi_params AssociationParameters describing network to connect. 244 @param description string Additional text for logging messages. 245 246 @returns AssociationResult if successful; None if wifi_params 247 contains expect_failure; asserts otherwise. 248 249 """ 250 if description: 251 connect_name = '%s (%s)' % (wifi_params.ssid, description) 252 else: 253 connect_name = '%s' % wifi_params.ssid 254 logging.info('Connecting to %s.', connect_name) 255 assoc_result = xmlrpc_datatypes.deserialize( 256 self.client.shill.connect_wifi(wifi_params)) 257 logging.info('Finished connection attempt to %s with times: ' 258 'discovery=%.2f, association=%.2f, configuration=%.2f.', 259 connect_name, 260 assoc_result.discovery_time, 261 assoc_result.association_time, 262 assoc_result.configuration_time) 263 264 if assoc_result.success and wifi_params.expect_failure: 265 raise error.TestFail( 266 'Expected connection to %s to fail, but it was successful.' % 267 connect_name) 268 269 if not assoc_result.success and not wifi_params.expect_failure: 270 raise error.TestFail( 271 'Expected connection to %s to succeed, ' 272 'but it failed with reason: %s.' % ( 273 connect_name, assoc_result.failure_reason)) 274 275 if wifi_params.expect_failure: 276 logging.info('Unable to connect to %s, as intended.', 277 connect_name) 278 return None 279 280 logging.info('Connected successfully to %s, signal level: %r.', 281 connect_name, self.client.wifi_signal_level) 282 return assoc_result 283 284 285 def assert_ping_from_dut(self, ping_config=None, ap_num=None): 286 """Ping a host on the WiFi network from the DUT. 287 288 Ping a host reachable on the WiFi network from the DUT, and 289 check that the ping is successful. The host we ping depends 290 on the test setup, sometimes that host may be the server and 291 sometimes it will be the router itself. Ping-ability may be 292 used to confirm that a WiFi network is operating correctly. 293 294 @param ping_config optional PingConfig object to override defaults. 295 @param ap_num int which AP to ping if more than one is configured. 296 297 """ 298 if ap_num is None: 299 ap_num = 0 300 if ping_config is None: 301 ping_ip = self.router.get_wifi_ip(ap_num=ap_num) 302 ping_config = ping_runner.PingConfig(ping_ip) 303 self.client.ping(ping_config) 304 305 306 def assert_ping_from_server(self, ping_config=None): 307 """Ping the DUT across the WiFi network from the server. 308 309 Check that the ping is mostly successful and fail the test if it 310 is not. 311 312 @param ping_config optional PingConfig object to override defaults. 313 314 """ 315 logging.info('Pinging from server.') 316 if ping_config is None: 317 ping_ip = self.client.wifi_ip 318 ping_config = ping_runner.PingConfig(ping_ip) 319 self.router.ping(ping_config) 320 321 322 def wait_for_connection(self, ssid, freq=None, ap_num=None, 323 timeout_seconds=30): 324 """Verifies a connection to network ssid on frequency freq. 325 326 @param ssid string ssid of the network to check. 327 @param freq int frequency of network to check. 328 @param ap_num int AP to which to connect 329 @param timeout_seconds int number of seconds to wait for 330 connection on the given frequency. 331 332 @returns a named tuple of (state, time) 333 """ 334 if ap_num is None: 335 ap_num = 0 336 desired_subnet = self.router.get_wifi_ip_subnet(ap_num) 337 wifi_ip = self.router.get_wifi_ip(ap_num) 338 return self.client.wait_for_connection( 339 ssid, timeout_seconds=timeout_seconds, freq=freq, 340 ping_ip=wifi_ip, desired_subnet=desired_subnet) 341