1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6 7from autotest_lib.client.common_lib import error 8from autotest_lib.client.common_lib.cros.network import ping_runner 9from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes 10from autotest_lib.server import hosts 11from autotest_lib.server import site_linux_router 12from autotest_lib.server import site_linux_system 13from autotest_lib.server.cros import dnsname_mangler 14from autotest_lib.server.cros.network import attenuator_controller 15from autotest_lib.server.cros.network import wifi_client 16 17class WiFiTestContextManager(object): 18 """A context manager for state used in WiFi autotests. 19 20 Some of the building blocks we use in WiFi tests need to be cleaned up 21 after use. For instance, we start an XMLRPC server on the client 22 which should be shut down so that the next test can start its instance. 23 It is convenient to manage this setup and teardown through a context 24 manager rather than building it into the test class logic. 25 26 """ 27 CMDLINE_ATTEN_ADDR = 'atten_addr' 28 CMDLINE_CLIENT_PACKET_CAPTURES = 'client_capture' 29 CMDLINE_CONDUCTIVE_RIG = 'conductive_rig' 30 CMDLINE_PACKET_CAPTURE_SNAPLEN = 'capture_snaplen' 31 CMDLINE_ROUTER_ADDR = 'router_addr' 32 CMDLINE_PACKET_CAPTURES = 'packet_capture' 33 CMDLINE_USE_WPA_CLI = 'use_wpa_cli' 34 35 36 @property 37 def attenuator(self): 38 """@return attenuator object (e.g. a BeagleBone).""" 39 if self._attenuator is None: 40 raise error.TestNAError('No attenuator available in this setup.') 41 42 return self._attenuator 43 44 45 @property 46 def client(self): 47 """@return WiFiClient object abstracting the DUT.""" 48 return self._client_proxy 49 50 51 @property 52 def router(self): 53 """@return router object (e.g. a LinuxCrosRouter).""" 54 return self._router 55 56 57 @property 58 def pcap_host(self): 59 """@return Dedicated packet capture host or None.""" 60 return self._pcap_host 61 62 63 @property 64 def capture_host(self): 65 """@return Dedicated pcap_host or the router itself.""" 66 return self.pcap_host or self.router 67 68 69 def __init__(self, test_name, host, cmdline_args, debug_dir): 70 """Construct a WiFiTestContextManager. 71 72 Optionally can pull addresses of the server address, router address, 73 or router port from cmdline_args. 74 75 @param test_name string descriptive name for this test. 76 @param host host object representing the DUT. 77 @param cmdline_args dict of key, value settings from command line. 78 79 """ 80 super(WiFiTestContextManager, self).__init__() 81 self._test_name = test_name 82 self._cmdline_args = cmdline_args.copy() 83 self._client_proxy = wifi_client.WiFiClient( 84 host, debug_dir, 85 self._get_bool_cmdline_value(self.CMDLINE_USE_WPA_CLI, False)) 86 self._attenuator = None 87 self._router = None 88 self._pcap_host = None 89 self._enable_client_packet_captures = False 90 self._enable_packet_captures = False 91 self._packet_capture_snaplen = None 92 93 94 def __enter__(self): 95 self.setup() 96 return self 97 98 99 def __exit__(self, exc_type, exc_value, traceback): 100 self.teardown() 101 102 103 def _get_bool_cmdline_value(self, key, default_value): 104 """Returns a bool value for the given key from the cmdline args. 105 106 @param key string cmdline args key. 107 @param default_value value to return if the key is not specified in the 108 cmdline args. 109 110 @return True/False or default_value if key is not specified in the 111 cmdline args. 112 113 """ 114 if key in self._cmdline_args: 115 value = self._cmdline_args[key].lower() 116 if value in ('1', 'true', 'yes', 'y'): 117 return True 118 else: 119 return False 120 else: 121 return default_value 122 123 124 def get_wifi_addr(self, ap_num=0): 125 """Return an IPv4 address pingable by the client on the WiFi subnet. 126 127 @param ap_num int number of AP. Only used in stumpy cells. 128 @return string IPv4 address. 129 130 """ 131 return self.router.local_server_address(ap_num) 132 133 134 def get_wifi_if(self, ap_num=0): 135 """Returns the interface name for the IP address of self.get_wifi_addr. 136 137 @param ap_num int number of AP. Only used in stumpy cells. 138 @return string interface name "e.g. wlan0". 139 140 """ 141 return self.router.get_hostapd_interface(ap_num) 142 143 144 def get_wifi_host(self): 145 """@return host object representing a pingable machine.""" 146 return self.router.host 147 148 149 def configure(self, configuration_parameters, multi_interface=None, 150 is_ibss=None): 151 """Configure a router with the given parameters. 152 153 Configures an AP according to the specified parameters and 154 enables whatever packet captures are appropriate. Will deconfigure 155 existing APs unless |multi_interface| is specified. 156 157 @param configuration_parameters HostapConfig object. 158 @param multi_interface True iff having multiple configured interfaces 159 is expected for this configure call. 160 @param is_ibss True iff this is an IBSS endpoint. 161 162 """ 163 if not self.client.is_frequency_supported( 164 configuration_parameters.frequency): 165 raise error.TestNAError('DUT does not support frequency: %s' % 166 configuration_parameters.frequency) 167 configuration_parameters.security_config.install_router_credentials( 168 self.router.host) 169 if is_ibss: 170 if multi_interface: 171 raise error.TestFail('IBSS mode does not support multiple ' 172 'interfaces.') 173 if not self.client.is_ibss_supported(): 174 raise error.TestNAError('DUT does not support IBSS mode') 175 self.router.ibss_configure(configuration_parameters) 176 else: 177 self.router.hostap_configure(configuration_parameters, 178 multi_interface=multi_interface) 179 if self._enable_client_packet_captures: 180 self.client.start_capture(configuration_parameters.frequency, 181 snaplen=self._packet_capture_snaplen) 182 if self._enable_packet_captures: 183 self.capture_host.start_capture( 184 configuration_parameters.frequency, 185 ht_type=configuration_parameters.ht_packet_capture_mode, 186 snaplen=self._packet_capture_snaplen) 187 188 189 def setup(self): 190 """Construct the state used in a WiFi test.""" 191 self._router = site_linux_router.build_router_proxy( 192 test_name=self._test_name, 193 client_hostname=self.client.host.hostname, 194 router_addr=self._cmdline_args.get(self.CMDLINE_ROUTER_ADDR, 195 None)) 196 ping_helper = ping_runner.PingRunner() 197 pcap_addr = dnsname_mangler.get_pcap_addr( 198 self.client.host.hostname, 199 allow_failure=True) 200 if pcap_addr and ping_helper.simple_ping(pcap_addr): 201 self._pcap_host = site_linux_system.LinuxSystem( 202 hosts.create_host(pcap_addr),'pcap') 203 # The attenuator host gives us the ability to attenuate particular 204 # antennas on the router. Most setups don't have this capability 205 # and most tests do not require it. We use this for RvR 206 # (network_WiFi_AttenuatedPerf) and some roaming tests. 207 attenuator_addr = dnsname_mangler.get_attenuator_addr( 208 self.client.host.hostname, 209 cmdline_override=self._cmdline_args.get( 210 self.CMDLINE_ATTEN_ADDR, None), 211 allow_failure=True) 212 if attenuator_addr and ping_helper.simple_ping(attenuator_addr): 213 self._attenuator = attenuator_controller.AttenuatorController( 214 hosts.SSHHost(attenuator_addr, port=22)) 215 # Set up a clean context to conduct WiFi tests in. 216 self.client.shill.init_test_network_state() 217 if self.CMDLINE_CLIENT_PACKET_CAPTURES in self._cmdline_args: 218 self._enable_client_packet_captures = True 219 if self.CMDLINE_PACKET_CAPTURES in self._cmdline_args: 220 self._enable_packet_captures = True 221 if self.CMDLINE_PACKET_CAPTURE_SNAPLEN in self._cmdline_args: 222 self._packet_capture_snaplen = int( 223 self._cmdline_args[self.CMDLINE_PACKET_CAPTURE_SNAPLEN]) 224 self.client.conductive = self._get_bool_cmdline_value( 225 self.CMDLINE_CONDUCTIVE_RIG, None) 226 for system in (self.client, self.router): 227 system.sync_host_times() 228 229 230 def teardown(self): 231 """Teardown the state used in a WiFi test.""" 232 logging.debug('Tearing down the test context.') 233 for system in [self._attenuator, self._client_proxy, 234 self._router, self._pcap_host]: 235 if system is not None: 236 system.close() 237 238 239 def assert_connect_wifi(self, wifi_params, description=None): 240 """Connect to a WiFi network and check for success. 241 242 Connect a DUT to a WiFi network and check that we connect successfully. 243 244 @param wifi_params AssociationParameters describing network to connect. 245 @param description string Additional text for logging messages. 246 247 @returns AssociationResult if successful; None if wifi_params 248 contains expect_failure; asserts otherwise. 249 250 """ 251 if description: 252 connect_name = '%s (%s)' % (wifi_params.ssid, description) 253 else: 254 connect_name = '%s' % wifi_params.ssid 255 logging.info('Connecting to %s.', connect_name) 256 assoc_result = xmlrpc_datatypes.deserialize( 257 self.client.shill.connect_wifi(wifi_params)) 258 logging.info('Finished connection attempt to %s with times: ' 259 'discovery=%.2f, association=%.2f, configuration=%.2f.', 260 connect_name, 261 assoc_result.discovery_time, 262 assoc_result.association_time, 263 assoc_result.configuration_time) 264 265 if assoc_result.success and wifi_params.expect_failure: 266 raise error.TestFail( 267 'Expected connection to %s to fail, but it was successful.' % 268 connect_name) 269 270 if not assoc_result.success and not wifi_params.expect_failure: 271 raise error.TestFail( 272 'Expected connection to %s to succeed, ' 273 'but it failed with reason: %s.' % ( 274 connect_name, assoc_result.failure_reason)) 275 276 if wifi_params.expect_failure: 277 logging.info('Unable to connect to %s, as intended.', 278 connect_name) 279 return None 280 281 logging.info('Connected successfully to %s, signal level: %r.', 282 connect_name, self.client.wifi_signal_level) 283 return assoc_result 284 285 286 def assert_ping_from_dut(self, ping_config=None, ap_num=None): 287 """Ping a host on the WiFi network from the DUT. 288 289 Ping a host reachable on the WiFi network from the DUT, and 290 check that the ping is successful. The host we ping depends 291 on the test setup, sometimes that host may be the server and 292 sometimes it will be the router itself. Ping-ability may be 293 used to confirm that a WiFi network is operating correctly. 294 295 @param ping_config optional PingConfig object to override defaults. 296 @param ap_num int which AP to ping if more than one is configured. 297 298 """ 299 if ap_num is None: 300 ap_num = 0 301 if ping_config is None: 302 ping_ip = self.router.get_wifi_ip(ap_num=ap_num) 303 ping_config = ping_runner.PingConfig(ping_ip) 304 self.client.ping(ping_config) 305 306 307 def assert_ping_from_server(self, ping_config=None): 308 """Ping the DUT across the WiFi network from the server. 309 310 Check that the ping is mostly successful and fail the test if it 311 is not. 312 313 @param ping_config optional PingConfig object to override defaults. 314 315 """ 316 logging.info('Pinging from server.') 317 if ping_config is None: 318 ping_ip = self.client.wifi_ip 319 ping_config = ping_runner.PingConfig(ping_ip) 320 self.router.ping(ping_config) 321 322 323 def wait_for_connection(self, ssid, freq=None, ap_num=None, 324 timeout_seconds=30): 325 """Verifies a connection to network ssid on frequency freq. 326 327 @param ssid string ssid of the network to check. 328 @param freq int frequency of network to check. 329 @param ap_num int AP to which to connect 330 @param timeout_seconds int number of seconds to wait for 331 connection on the given frequency. 332 333 @returns a named tuple of (state, time) 334 """ 335 if ap_num is None: 336 ap_num = 0 337 desired_subnet = self.router.get_wifi_ip_subnet(ap_num) 338 wifi_ip = self.router.get_wifi_ip(ap_num) 339 return self.client.wait_for_connection( 340 ssid, timeout_seconds=timeout_seconds, freq=freq, 341 ping_ip=wifi_ip, desired_subnet=desired_subnet) 342