1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6 7from autotest_lib.client.common_lib import error 8from autotest_lib.client.common_lib.cros.network import ping_runner 9from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes 10from autotest_lib.server import hosts 11from autotest_lib.server import site_linux_router 12from autotest_lib.server import site_linux_system 13from autotest_lib.server.cros import dnsname_mangler 14from autotest_lib.server.cros.network import attenuator_controller 15from autotest_lib.server.cros.network import wifi_client 16 17class WiFiTestContextManager(object): 18 """A context manager for state used in WiFi autotests. 19 20 Some of the building blocks we use in WiFi tests need to be cleaned up 21 after use. For instance, we start an XMLRPC server on the client 22 which should be shut down so that the next test can start its instance. 23 It is convenient to manage this setup and teardown through a context 24 manager rather than building it into the test class logic. 25 26 """ 27 CMDLINE_ATTEN_ADDR = 'atten_addr' 28 CMDLINE_CLIENT_PACKET_CAPTURES = 'client_capture' 29 CMDLINE_CONDUCTIVE_RIG = 'conductive_rig' 30 CMDLINE_PACKET_CAPTURE_SNAPLEN = 'capture_snaplen' 31 CMDLINE_ROUTER_ADDR = 'router_addr' 32 CMDLINE_PCAP_ADDR = 'pcap_addr' 33 CMDLINE_PACKET_CAPTURES = 'packet_capture' 34 CMDLINE_USE_WPA_CLI = 'use_wpa_cli' 35 36 37 @property 38 def attenuator(self): 39 """@return attenuator object (e.g. a BeagleBone).""" 40 if self._attenuator is None: 41 raise error.TestNAError('No attenuator available in this setup.') 42 43 return self._attenuator 44 45 46 @property 47 def client(self): 48 """@return WiFiClient object abstracting the DUT.""" 49 return self._client_proxy 50 51 52 @property 53 def router(self): 54 """@return router object (e.g. a LinuxCrosRouter).""" 55 return self._router 56 57 58 @property 59 def pcap_host(self): 60 """@return Dedicated packet capture host or None.""" 61 return self._pcap_host 62 63 64 @property 65 def capture_host(self): 66 """@return Dedicated pcap_host or the router itself.""" 67 return self.pcap_host or self.router 68 69 70 def __init__(self, test_name, host, cmdline_args, debug_dir): 71 """Construct a WiFiTestContextManager. 72 73 Optionally can pull addresses of the server address, router address, 74 or router port from cmdline_args. 75 76 @param test_name string descriptive name for this test. 77 @param host host object representing the DUT. 78 @param cmdline_args dict of key, value settings from command line. 79 80 """ 81 super(WiFiTestContextManager, self).__init__() 82 self._test_name = test_name 83 self._cmdline_args = cmdline_args.copy() 84 self._client_proxy = wifi_client.WiFiClient( 85 host, debug_dir, 86 self._get_bool_cmdline_value(self.CMDLINE_USE_WPA_CLI, False)) 87 self._attenuator = None 88 self._router = None 89 self._pcap_host = None 90 self._enable_client_packet_captures = False 91 self._enable_packet_captures = False 92 self._packet_capture_snaplen = None 93 94 95 def __enter__(self): 96 self.setup() 97 return self 98 99 100 def __exit__(self, exc_type, exc_value, traceback): 101 self.teardown() 102 103 104 def _get_bool_cmdline_value(self, key, default_value): 105 """Returns a bool value for the given key from the cmdline args. 106 107 @param key string cmdline args key. 108 @param default_value value to return if the key is not specified in the 109 cmdline args. 110 111 @return True/False or default_value if key is not specified in the 112 cmdline args. 113 114 """ 115 if key in self._cmdline_args: 116 value = self._cmdline_args[key].lower() 117 if value in ('1', 'true', 'yes', 'y'): 118 return True 119 else: 120 return False 121 else: 122 return default_value 123 124 125 def get_wifi_addr(self, ap_num=0): 126 """Return an IPv4 address pingable by the client on the WiFi subnet. 127 128 @param ap_num int number of AP. Only used in stumpy cells. 129 @return string IPv4 address. 130 131 """ 132 return self.router.local_server_address(ap_num) 133 134 135 def get_wifi_if(self, ap_num=0): 136 """Returns the interface name for the IP address of self.get_wifi_addr. 137 138 @param ap_num int number of AP. Only used in stumpy cells. 139 @return string interface name "e.g. wlan0". 140 141 """ 142 return self.router.get_hostapd_interface(ap_num) 143 144 145 def get_wifi_host(self): 146 """@return host object representing a pingable machine.""" 147 return self.router.host 148 149 150 def configure(self, ap_config, multi_interface=None, is_ibss=None): 151 """Configure a router with the given config. 152 153 Configures an AP according to the specified config and 154 enables whatever packet captures are appropriate. Will deconfigure 155 existing APs unless |multi_interface| is specified. 156 157 @param ap_config HostapConfig object. 158 @param multi_interface True iff having multiple configured interfaces 159 is expected for this configure call. 160 @param is_ibss True iff this is an IBSS endpoint. 161 162 """ 163 if not self.client.is_frequency_supported(ap_config.frequency): 164 raise error.TestNAError('DUT does not support frequency: %s' % 165 ap_config.frequency) 166 if ap_config.require_vht: 167 self.client.require_capabilities( 168 [site_linux_system.LinuxSystem.CAPABILITY_VHT]) 169 ap_config.security_config.install_router_credentials(self.router.host) 170 if is_ibss: 171 if multi_interface: 172 raise error.TestFail('IBSS mode does not support multiple ' 173 'interfaces.') 174 if not self.client.is_ibss_supported(): 175 raise error.TestNAError('DUT does not support IBSS mode') 176 self.router.ibss_configure(ap_config) 177 else: 178 self.router.hostap_configure(ap_config, 179 multi_interface=multi_interface) 180 if self._enable_client_packet_captures: 181 self.client.start_capture(ap_config.frequency, 182 snaplen=self._packet_capture_snaplen) 183 if self._enable_packet_captures: 184 self.capture_host.start_capture(ap_config.frequency, 185 width_type=ap_config.packet_capture_mode, 186 snaplen=self._packet_capture_snaplen) 187 188 189 def _setup_router(self): 190 """Set up the router device.""" 191 self._router = site_linux_router.build_router_proxy( 192 test_name=self._test_name, 193 client_hostname=self.client.host.hostname, 194 router_addr=self._cmdline_args.get( 195 self.CMDLINE_ROUTER_ADDR, None)) 196 self.router.sync_host_times() 197 198 199 def _setup_pcap(self): 200 """Set up the pcap device.""" 201 ping_helper = ping_runner.PingRunner() 202 pcap_addr = dnsname_mangler.get_pcap_addr( 203 self.client.host.hostname, 204 allow_failure=True, 205 cmdline_override=self._cmdline_args.get(self.CMDLINE_PCAP_ADDR, 206 None)) 207 if pcap_addr and ping_helper.simple_ping(pcap_addr): 208 self._pcap_host = site_linux_system.LinuxSystem( 209 hosts.create_host(pcap_addr),'pcap') 210 211 212 def _setup_attenuator(self): 213 """ 214 Set up the attenuator device. 215 216 The attenuator host gives us the ability to attenuate particular 217 antennas on the router. Most setups don't have this capability 218 and most tests do not require it. We use this for RvR 219 (network_WiFi_AttenuatedPerf) and some roaming tests. 220 """ 221 ping_helper = ping_runner.PingRunner() 222 attenuator_addr = dnsname_mangler.get_attenuator_addr( 223 self.client.host.hostname, 224 cmdline_override=self._cmdline_args.get( 225 self.CMDLINE_ATTEN_ADDR, None), 226 allow_failure=True) 227 if attenuator_addr and ping_helper.simple_ping(attenuator_addr): 228 self._attenuator = attenuator_controller.AttenuatorController( 229 attenuator_addr) 230 231 def setup(self, include_router=True, include_pcap=True, 232 include_attenuator=True): 233 """ 234 Construct the state used in a WiFi test. 235 236 @param include_router optional bool which should be False if the router 237 is not used by the test 238 @param include_pcap optional bool which should be False if the pcap 239 device is not used by the test 240 @param include_attenuator optional bool which should be False if the 241 attenuator is not used by the test 242 """ 243 if include_router: 244 self._setup_router() 245 if include_pcap: 246 self._setup_pcap() 247 if include_attenuator: 248 self._setup_attenuator() 249 250 # Set up a clean context to conduct WiFi tests in. 251 self.client.shill.init_test_network_state() 252 self.client.sync_host_times() 253 254 if self.CMDLINE_CLIENT_PACKET_CAPTURES in self._cmdline_args: 255 self._enable_client_packet_captures = True 256 if self.CMDLINE_PACKET_CAPTURES in self._cmdline_args: 257 self._enable_packet_captures = True 258 if self.CMDLINE_PACKET_CAPTURE_SNAPLEN in self._cmdline_args: 259 self._packet_capture_snaplen = int( 260 self._cmdline_args[self.CMDLINE_PACKET_CAPTURE_SNAPLEN]) 261 self.client.conductive = self._get_bool_cmdline_value( 262 self.CMDLINE_CONDUCTIVE_RIG, None) 263 264 265 def teardown(self): 266 """Teardown the state used in a WiFi test.""" 267 logging.debug('Tearing down the test context.') 268 for system in [self._attenuator, self._client_proxy, 269 self._router, self._pcap_host]: 270 if system is not None: 271 system.close() 272 273 274 def assert_connect_wifi(self, wifi_params, description=None): 275 """Connect to a WiFi network and check for success. 276 277 Connect a DUT to a WiFi network and check that we connect successfully. 278 279 @param wifi_params AssociationParameters describing network to connect. 280 @param description string Additional text for logging messages. 281 282 @returns AssociationResult if successful; None if wifi_params 283 contains expect_failure; asserts otherwise. 284 285 """ 286 if description: 287 connect_name = '%s (%s)' % (wifi_params.ssid, description) 288 else: 289 connect_name = '%s' % wifi_params.ssid 290 logging.info('Connecting to %s.', connect_name) 291 assoc_result = xmlrpc_datatypes.deserialize( 292 self.client.shill.connect_wifi(wifi_params)) 293 logging.info('Finished connection attempt to %s with times: ' 294 'discovery=%.2f, association=%.2f, configuration=%.2f.', 295 connect_name, 296 assoc_result.discovery_time, 297 assoc_result.association_time, 298 assoc_result.configuration_time) 299 300 if assoc_result.success and wifi_params.expect_failure: 301 raise error.TestFail( 302 'Expected connection to %s to fail, but it was successful.' % 303 connect_name) 304 305 if not assoc_result.success and not wifi_params.expect_failure: 306 raise error.TestFail( 307 'Expected connection to %s to succeed, ' 308 'but it failed with reason: %s.' % ( 309 connect_name, assoc_result.failure_reason)) 310 311 if wifi_params.expect_failure: 312 logging.info('Unable to connect to %s, as intended.', 313 connect_name) 314 return None 315 316 logging.info('Connected successfully to %s, signal level: %r.', 317 connect_name, self.client.wifi_signal_level) 318 return assoc_result 319 320 321 def assert_ping_from_dut(self, ping_config=None, ap_num=None): 322 """Ping a host on the WiFi network from the DUT. 323 324 Ping a host reachable on the WiFi network from the DUT, and 325 check that the ping is successful. The host we ping depends 326 on the test setup, sometimes that host may be the server and 327 sometimes it will be the router itself. Ping-ability may be 328 used to confirm that a WiFi network is operating correctly. 329 330 @param ping_config optional PingConfig object to override defaults. 331 @param ap_num int which AP to ping if more than one is configured. 332 333 """ 334 if ap_num is None: 335 ap_num = 0 336 if ping_config is None: 337 ping_ip = self.router.get_wifi_ip(ap_num=ap_num) 338 ping_config = ping_runner.PingConfig(ping_ip) 339 self.client.ping(ping_config) 340 341 342 def assert_ping_from_server(self, ping_config=None): 343 """Ping the DUT across the WiFi network from the server. 344 345 Check that the ping is mostly successful and fail the test if it 346 is not. 347 348 @param ping_config optional PingConfig object to override defaults. 349 350 """ 351 logging.info('Pinging from server.') 352 if ping_config is None: 353 ping_ip = self.client.wifi_ip 354 ping_config = ping_runner.PingConfig(ping_ip) 355 self.router.ping(ping_config) 356 357 358 def wait_for_connection(self, ssid, freq=None, ap_num=None, 359 timeout_seconds=30): 360 """Verifies a connection to network ssid on frequency freq. 361 362 @param ssid string ssid of the network to check. 363 @param freq int frequency of network to check. 364 @param ap_num int AP to which to connect 365 @param timeout_seconds int number of seconds to wait for 366 connection on the given frequency. 367 368 @returns a named tuple of (state, time) 369 """ 370 if ap_num is None: 371 ap_num = 0 372 desired_subnet = self.router.get_wifi_ip_subnet(ap_num) 373 wifi_ip = self.router.get_wifi_ip(ap_num) 374 return self.client.wait_for_connection( 375 ssid, timeout_seconds=timeout_seconds, freq=freq, 376 ping_ip=wifi_ip, desired_subnet=desired_subnet) 377