1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6 7from autotest_lib.client.common_lib import error 8from autotest_lib.client.common_lib.cros.network import ping_runner 9from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes 10from autotest_lib.server import hosts 11from autotest_lib.server import site_linux_router 12from autotest_lib.server import site_linux_system 13from autotest_lib.server.cros import dnsname_mangler 14from autotest_lib.server.cros.network import attenuator_controller 15from autotest_lib.server.cros.network import wifi_client 16 17class WiFiTestContextManager(object): 18 """A context manager for state used in WiFi autotests. 19 20 Some of the building blocks we use in WiFi tests need to be cleaned up 21 after use. For instance, we start an XMLRPC server on the client 22 which should be shut down so that the next test can start its instance. 23 It is convenient to manage this setup and teardown through a context 24 manager rather than building it into the test class logic. 25 26 """ 27 CMDLINE_ATTEN_ADDR = 'atten_addr' 28 CMDLINE_CLIENT_PACKET_CAPTURES = 'client_capture' 29 CMDLINE_CONDUCTIVE_RIG = 'conductive_rig' 30 CMDLINE_PACKET_CAPTURE_SNAPLEN = 'capture_snaplen' 31 CMDLINE_ROUTER_ADDR = 'router_addr' 32 CMDLINE_PCAP_ADDR = 'pcap_addr' 33 CMDLINE_PACKET_CAPTURES = 'packet_capture' 34 CMDLINE_USE_WPA_CLI = 'use_wpa_cli' 35 36 37 @property 38 def attenuator(self): 39 """@return attenuator object (e.g. a BeagleBone).""" 40 if self._attenuator is None: 41 raise error.TestNAError('No attenuator available in this setup.') 42 43 return self._attenuator 44 45 46 @property 47 def client(self): 48 """@return WiFiClient object abstracting the DUT.""" 49 return self._client_proxy 50 51 52 @property 53 def router(self): 54 """@return router object (e.g. a LinuxCrosRouter).""" 55 return self._router 56 57 58 @property 59 def pcap_host(self): 60 """@return Dedicated packet capture host or None.""" 61 return self._pcap_host 62 63 64 @property 65 def capture_host(self): 66 """@return Dedicated pcap_host or the router itself.""" 67 return self.pcap_host or self.router 68 69 70 def __init__(self, test_name, host, cmdline_args, debug_dir): 71 """Construct a WiFiTestContextManager. 72 73 Optionally can pull addresses of the server address, router address, 74 or router port from cmdline_args. 75 76 @param test_name string descriptive name for this test. 77 @param host host object representing the DUT. 78 @param cmdline_args dict of key, value settings from command line. 79 80 """ 81 super(WiFiTestContextManager, self).__init__() 82 self._test_name = test_name 83 self._cmdline_args = cmdline_args.copy() 84 self._client_proxy = wifi_client.WiFiClient( 85 host, debug_dir, 86 self._get_bool_cmdline_value(self.CMDLINE_USE_WPA_CLI, False)) 87 self._attenuator = None 88 self._router = None 89 self._pcap_host = None 90 self._enable_client_packet_captures = False 91 self._enable_packet_captures = False 92 self._packet_capture_snaplen = None 93 94 95 def __enter__(self): 96 self.setup() 97 return self 98 99 100 def __exit__(self, exc_type, exc_value, traceback): 101 self.teardown() 102 103 104 def _get_bool_cmdline_value(self, key, default_value): 105 """Returns a bool value for the given key from the cmdline args. 106 107 @param key string cmdline args key. 108 @param default_value value to return if the key is not specified in the 109 cmdline args. 110 111 @return True/False or default_value if key is not specified in the 112 cmdline args. 113 114 """ 115 if key in self._cmdline_args: 116 value = self._cmdline_args[key].lower() 117 if value in ('1', 'true', 'yes', 'y'): 118 return True 119 else: 120 return False 121 else: 122 return default_value 123 124 125 def get_wifi_addr(self, ap_num=0): 126 """Return an IPv4 address pingable by the client on the WiFi subnet. 127 128 @param ap_num int number of AP. Only used in stumpy cells. 129 @return string IPv4 address. 130 131 """ 132 return self.router.local_server_address(ap_num) 133 134 135 def get_wifi_if(self, ap_num=0): 136 """Returns the interface name for the IP address of self.get_wifi_addr. 137 138 @param ap_num int number of AP. Only used in stumpy cells. 139 @return string interface name "e.g. wlan0". 140 141 """ 142 return self.router.get_hostapd_interface(ap_num) 143 144 145 def get_wifi_host(self): 146 """@return host object representing a pingable machine.""" 147 return self.router.host 148 149 150 def configure(self, ap_config, multi_interface=None, is_ibss=None, 151 configure_pcap=False): 152 """Configure a router with the given config. 153 154 Configures an AP according to the specified config and 155 enables whatever packet captures are appropriate. Will deconfigure 156 existing APs unless |multi_interface| is specified. 157 158 @param ap_config HostapConfig object. 159 @param multi_interface True iff having multiple configured interfaces 160 is expected for this configure call. 161 @param is_ibss True iff this is an IBSS endpoint. 162 @param configure_pcap True iff pcap_host should be configured for this 163 configure call. Raises a TestNAError if |self._pcap_as_router| 164 is False. 165 """ 166 if configure_pcap and not self._pcap_as_router: 167 raise error.TestNAError('pcap was not configured as router.') 168 if not self.client.is_frequency_supported(ap_config.frequency): 169 raise error.TestNAError('DUT does not support frequency: %s' % 170 ap_config.frequency) 171 if ap_config.require_vht: 172 self.client.require_capabilities( 173 [site_linux_system.LinuxSystem.CAPABILITY_VHT]) 174 router = self.router 175 if configure_pcap: 176 router = self.pcap_host 177 ap_config.security_config.install_router_credentials(router.host, 178 router.logdir) 179 if is_ibss: 180 if multi_interface: 181 raise error.TestFail('IBSS mode does not support multiple ' 182 'interfaces.') 183 if not self.client.is_ibss_supported(): 184 raise error.TestNAError('DUT does not support IBSS mode') 185 router.ibss_configure(ap_config) 186 else: 187 router.hostap_configure(ap_config, multi_interface=multi_interface) 188 if self._enable_client_packet_captures: 189 self.client.start_capture(ap_config.frequency, 190 snaplen=self._packet_capture_snaplen) 191 if self._enable_packet_captures: 192 self.capture_host.start_capture(ap_config.frequency, 193 width_type=ap_config.packet_capture_mode, 194 snaplen=self._packet_capture_snaplen) 195 196 197 def _setup_router(self): 198 """Set up the router device.""" 199 self._router = site_linux_router.build_router_proxy( 200 test_name=self._test_name, 201 client_hostname=self.client.host.hostname, 202 router_addr=self._cmdline_args.get( 203 self.CMDLINE_ROUTER_ADDR, None)) 204 self.router.sync_host_times() 205 206 207 def _setup_pcap(self, pcap_as_router=False): 208 """ 209 Set up the pcap device. 210 211 @param pcap_as_router optional bool which should be True if the pcap 212 should be configured as a router. 213 """ 214 ping_helper = ping_runner.PingRunner() 215 pcap_addr = dnsname_mangler.get_pcap_addr( 216 self.client.host.hostname, 217 allow_failure=True, 218 cmdline_override=self._cmdline_args.get(self.CMDLINE_PCAP_ADDR, 219 None)) 220 if pcap_addr and ping_helper.simple_ping(pcap_addr): 221 if pcap_as_router: 222 self._pcap_host = site_linux_router.LinuxRouter( 223 hosts.create_host(pcap_addr), 224 role='pcap', 225 test_name=self._test_name) 226 else: 227 self._pcap_host = site_linux_system.LinuxSystem( 228 hosts.create_host(pcap_addr),'pcap') 229 230 231 def _setup_attenuator(self): 232 """ 233 Set up the attenuator device. 234 235 The attenuator host gives us the ability to attenuate particular 236 antennas on the router. Most setups don't have this capability 237 and most tests do not require it. We use this for RvR 238 (network_WiFi_AttenuatedPerf) and some roaming tests. 239 """ 240 ping_helper = ping_runner.PingRunner() 241 attenuator_addr = dnsname_mangler.get_attenuator_addr( 242 self.client.host.hostname, 243 cmdline_override=self._cmdline_args.get( 244 self.CMDLINE_ATTEN_ADDR, None), 245 allow_failure=True) 246 if attenuator_addr and ping_helper.simple_ping(attenuator_addr): 247 self._attenuator = attenuator_controller.AttenuatorController( 248 attenuator_addr) 249 250 def setup(self, include_router=True, include_pcap=True, 251 include_attenuator=True, pcap_as_router=False): 252 """ 253 Construct the state used in a WiFi test. 254 255 @param include_router optional bool which should be False if the router 256 is not used by the test 257 @param include_pcap optional bool which should be False if the pcap 258 device is not used by the test 259 @param include_attenuator optional bool which should be False if the 260 attenuator is not used by the test 261 @param pcap_as_router optional bool which should be True if the pcap 262 should be configured as a router. 263 """ 264 if include_router: 265 self._setup_router() 266 if include_pcap: 267 self._setup_pcap(pcap_as_router) 268 self._pcap_as_router = pcap_as_router 269 if include_attenuator: 270 self._setup_attenuator() 271 272 # Set up a clean context to conduct WiFi tests in. 273 self.client.shill.init_test_network_state() 274 self.client.sync_host_times() 275 276 if self.CMDLINE_CLIENT_PACKET_CAPTURES in self._cmdline_args: 277 self._enable_client_packet_captures = True 278 if self.CMDLINE_PACKET_CAPTURES in self._cmdline_args: 279 self._enable_packet_captures = True 280 if self.CMDLINE_PACKET_CAPTURE_SNAPLEN in self._cmdline_args: 281 self._packet_capture_snaplen = int( 282 self._cmdline_args[self.CMDLINE_PACKET_CAPTURE_SNAPLEN]) 283 self.client.conductive = self._get_bool_cmdline_value( 284 self.CMDLINE_CONDUCTIVE_RIG, None) 285 286 287 def teardown(self): 288 """Teardown the state used in a WiFi test.""" 289 logging.debug('Tearing down the test context.') 290 for system in [self._attenuator, self._client_proxy, 291 self._router, self._pcap_host]: 292 if system is not None: 293 system.close() 294 295 296 def assert_connect_wifi(self, wifi_params, description=None): 297 """Connect to a WiFi network and check for success. 298 299 Connect a DUT to a WiFi network and check that we connect successfully. 300 301 @param wifi_params AssociationParameters describing network to connect. 302 @param description string Additional text for logging messages. 303 304 @returns AssociationResult if successful; None if wifi_params 305 contains expect_failure; asserts otherwise. 306 307 """ 308 if description: 309 connect_name = '%s (%s)' % (wifi_params.ssid, description) 310 else: 311 connect_name = '%s' % wifi_params.ssid 312 logging.info('Connecting to %s.', connect_name) 313 assoc_result = xmlrpc_datatypes.deserialize( 314 self.client.shill.connect_wifi(wifi_params)) 315 logging.info('Finished connection attempt to %s with times: ' 316 'discovery=%.2f, association=%.2f, configuration=%.2f.', 317 connect_name, 318 assoc_result.discovery_time, 319 assoc_result.association_time, 320 assoc_result.configuration_time) 321 322 if assoc_result.success and wifi_params.expect_failure: 323 raise error.TestFail( 324 'Expected connection to %s to fail, but it was successful.' % 325 connect_name) 326 327 if not assoc_result.success and not wifi_params.expect_failure: 328 raise error.TestFail( 329 'Expected connection to %s to succeed, ' 330 'but it failed with reason: %s.' % ( 331 connect_name, assoc_result.failure_reason)) 332 333 if wifi_params.expect_failure: 334 logging.info('Unable to connect to %s, as intended.', 335 connect_name) 336 return None 337 338 logging.info('Connected successfully to %s, signal level: %r.', 339 connect_name, self.client.wifi_signal_level) 340 return assoc_result 341 342 343 def assert_ping_from_dut(self, ping_config=None, ap_num=None): 344 """Ping a host on the WiFi network from the DUT. 345 346 Ping a host reachable on the WiFi network from the DUT, and 347 check that the ping is successful. The host we ping depends 348 on the test setup, sometimes that host may be the server and 349 sometimes it will be the router itself. Ping-ability may be 350 used to confirm that a WiFi network is operating correctly. 351 352 @param ping_config optional PingConfig object to override defaults. 353 @param ap_num int which AP to ping if more than one is configured. 354 355 """ 356 if ap_num is None: 357 ap_num = 0 358 if ping_config is None: 359 ping_ip = self.router.get_wifi_ip(ap_num=ap_num) 360 ping_config = ping_runner.PingConfig(ping_ip) 361 self.client.ping(ping_config) 362 363 364 def assert_ping_from_server(self, ping_config=None): 365 """Ping the DUT across the WiFi network from the server. 366 367 Check that the ping is mostly successful and fail the test if it 368 is not. 369 370 @param ping_config optional PingConfig object to override defaults. 371 372 """ 373 logging.info('Pinging from server.') 374 if ping_config is None: 375 ping_ip = self.client.wifi_ip 376 ping_config = ping_runner.PingConfig(ping_ip) 377 self.router.ping(ping_config) 378 379 380 def wait_for_connection(self, ssid, freq=None, ap_num=None, 381 timeout_seconds=30): 382 """Verifies a connection to network ssid on frequency freq. 383 384 @param ssid string ssid of the network to check. 385 @param freq int frequency of network to check. 386 @param ap_num int AP to which to connect 387 @param timeout_seconds int number of seconds to wait for 388 connection on the given frequency. 389 390 @returns a named tuple of (state, time) 391 """ 392 if ap_num is None: 393 ap_num = 0 394 desired_subnet = self.router.get_wifi_ip_subnet(ap_num) 395 wifi_ip = self.router.get_wifi_ip(ap_num) 396 return self.client.wait_for_connection( 397 ssid, timeout_seconds=timeout_seconds, freq=freq, 398 ping_ip=wifi_ip, desired_subnet=desired_subnet) 399