1# Lint as: python2, python3 2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import logging 7 8from autotest_lib.client.common_lib import error 9from autotest_lib.client.common_lib.cros.network import ping_runner 10from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes 11from autotest_lib.server import hosts 12from autotest_lib.server import site_linux_router 13from autotest_lib.server import site_linux_system 14from autotest_lib.server.cros import dnsname_mangler 15from autotest_lib.server.cros.network import attenuator_controller 16from autotest_lib.server.cros.network import wifi_client 17 18class WiFiTestContextManager(object): 19 """A context manager for state used in WiFi autotests. 20 21 Some of the building blocks we use in WiFi tests need to be cleaned up 22 after use. For instance, we start an XMLRPC server on the client 23 which should be shut down so that the next test can start its instance. 24 It is convenient to manage this setup and teardown through a context 25 manager rather than building it into the test class logic. 26 27 """ 28 CMDLINE_ATTEN_ADDR = 'atten_addr' 29 CMDLINE_CLIENT_PACKET_CAPTURES = 'client_capture' 30 CMDLINE_CONDUCTIVE_RIG = 'conductive_rig' 31 CMDLINE_PACKET_CAPTURE_SNAPLEN = 'capture_snaplen' 32 CMDLINE_ROUTER_ADDR = 'router_addr' 33 CMDLINE_PCAP_ADDR = 'pcap_addr' 34 CMDLINE_PACKET_CAPTURES = 'packet_capture' 35 CMDLINE_USE_WPA_CLI = 'use_wpa_cli' 36 37 38 @property 39 def attenuator(self): 40 """@return attenuator object (e.g. a BeagleBone).""" 41 if self._attenuator is None: 42 raise error.TestNAError('No attenuator available in this setup.') 43 44 return self._attenuator 45 46 47 @property 48 def client(self): 49 """@return WiFiClient object abstracting the DUT.""" 50 return self._client_proxy 51 52 53 @property 54 def router(self): 55 """@return router object (e.g. a LinuxCrosRouter).""" 56 return self._router 57 58 59 @property 60 def pcap_host(self): 61 """@return Dedicated packet capture host or None.""" 62 return self._pcap_host 63 64 65 @property 66 def capture_host(self): 67 """@return Dedicated pcap_host or the router itself.""" 68 return self.pcap_host or self.router 69 70 71 def __init__(self, test_name, host, cmdline_args, debug_dir): 72 """Construct a WiFiTestContextManager. 73 74 Optionally can pull addresses of the server address, router address, 75 or router port from cmdline_args. 76 77 @param test_name string descriptive name for this test. 78 @param host host object representing the DUT. 79 @param cmdline_args dict of key, value settings from command line. 80 81 """ 82 super(WiFiTestContextManager, self).__init__() 83 self._test_name = test_name 84 self._cmdline_args = cmdline_args.copy() 85 self._client_proxy = wifi_client.WiFiClient( 86 host, debug_dir, 87 self._get_bool_cmdline_value(self.CMDLINE_USE_WPA_CLI, False)) 88 self._attenuator = None 89 self._router = None 90 self._pcap_host = None 91 self._enable_client_packet_captures = False 92 self._enable_packet_captures = False 93 self._packet_capture_snaplen = None 94 95 96 def __enter__(self): 97 self.setup() 98 return self 99 100 101 def __exit__(self, exc_type, exc_value, traceback): 102 self.teardown() 103 104 105 def _get_bool_cmdline_value(self, key, default_value): 106 """Returns a bool value for the given key from the cmdline args. 107 108 @param key string cmdline args key. 109 @param default_value value to return if the key is not specified in the 110 cmdline args. 111 112 @return True/False or default_value if key is not specified in the 113 cmdline args. 114 115 """ 116 if key in self._cmdline_args: 117 value = self._cmdline_args[key].lower() 118 if value in ('1', 'true', 'yes', 'y'): 119 return True 120 else: 121 return False 122 else: 123 return default_value 124 125 126 def get_wifi_addr(self, ap_num=0): 127 """Return an IPv4 address pingable by the client on the WiFi subnet. 128 129 @param ap_num int number of AP. Only used in stumpy cells. 130 @return string IPv4 address. 131 132 """ 133 return self.router.local_server_address(ap_num) 134 135 136 def get_wifi_if(self, ap_num=0): 137 """Returns the interface name for the IP address of self.get_wifi_addr. 138 139 @param ap_num int number of AP. Only used in stumpy cells. 140 @return string interface name "e.g. wlan0". 141 142 """ 143 return self.router.get_hostapd_interface(ap_num) 144 145 146 def get_wifi_host(self): 147 """@return host object representing a pingable machine.""" 148 return self.router.host 149 150 151 def configure(self, ap_config, multi_interface=None, is_ibss=None, 152 configure_pcap=False): 153 """Configure a router with the given config. 154 155 Configures an AP according to the specified config and 156 enables whatever packet captures are appropriate. Will deconfigure 157 existing APs unless |multi_interface| is specified. 158 159 @param ap_config HostapConfig object. 160 @param multi_interface True iff having multiple configured interfaces 161 is expected for this configure call. 162 @param is_ibss True iff this is an IBSS endpoint. 163 @param configure_pcap True iff pcap_host should be configured for this 164 configure call. Raises a TestNAError if |self._pcap_as_router| 165 is False. 166 """ 167 if configure_pcap and not self._pcap_as_router: 168 raise error.TestNAError('pcap was not configured as router.') 169 if not self.client.is_frequency_supported(ap_config.frequency): 170 raise error.TestNAError('DUT does not support frequency: %s' % 171 ap_config.frequency) 172 if ap_config.require_vht: 173 self.client.require_capabilities( 174 [site_linux_system.LinuxSystem.CAPABILITY_VHT]) 175 router = self.router 176 if configure_pcap: 177 router = self.pcap_host 178 ap_config.security_config.install_router_credentials(router.host, 179 router.logdir) 180 if is_ibss: 181 if multi_interface: 182 raise error.TestFail('IBSS mode does not support multiple ' 183 'interfaces.') 184 if not self.client.is_ibss_supported(): 185 raise error.TestNAError('DUT does not support IBSS mode') 186 router.ibss_configure(ap_config) 187 else: 188 router.hostap_configure(ap_config, multi_interface=multi_interface) 189 if self._enable_client_packet_captures: 190 self.client.start_capture(ap_config.frequency, 191 snaplen=self._packet_capture_snaplen) 192 if self._enable_packet_captures: 193 self.capture_host.start_capture( 194 ap_config.frequency, 195 width_type=ap_config.packet_capture_mode, 196 snaplen=self._packet_capture_snaplen) 197 198 199 def _setup_router(self): 200 """Set up the router device.""" 201 self._router = site_linux_router.build_router_proxy( 202 test_name=self._test_name, 203 client_hostname=self.client.host.hostname, 204 router_addr=self._cmdline_args.get( 205 self.CMDLINE_ROUTER_ADDR, None)) 206 self.router.sync_host_times() 207 208 209 def _setup_pcap(self, pcap_as_router=False): 210 """ 211 Set up the pcap device. 212 213 @param pcap_as_router optional bool which should be True if the pcap 214 should be configured as a router. 215 """ 216 ping_helper = ping_runner.PingRunner() 217 pcap_addr = dnsname_mangler.get_pcap_addr( 218 self.client.host.hostname, 219 allow_failure=True, 220 cmdline_override=self._cmdline_args.get(self.CMDLINE_PCAP_ADDR, 221 None)) 222 if pcap_addr and ping_helper.simple_ping(pcap_addr): 223 if pcap_as_router: 224 self._pcap_host = site_linux_router.LinuxRouter( 225 hosts.create_host(pcap_addr), 226 role='pcap', 227 test_name=self._test_name) 228 else: 229 self._pcap_host = site_linux_system.LinuxSystem( 230 hosts.create_host(pcap_addr),'pcap') 231 232 233 def _setup_attenuator(self): 234 """ 235 Set up the attenuator device. 236 237 The attenuator host gives us the ability to attenuate particular 238 antennas on the router. Most setups don't have this capability 239 and most tests do not require it. We use this for RvR 240 (network_WiFi_AttenuatedPerf) and some roaming tests. 241 """ 242 ping_helper = ping_runner.PingRunner() 243 attenuator_addr = dnsname_mangler.get_attenuator_addr( 244 self.client.host.hostname, 245 cmdline_override=self._cmdline_args.get( 246 self.CMDLINE_ATTEN_ADDR, None), 247 allow_failure=True) 248 if attenuator_addr and ping_helper.simple_ping(attenuator_addr): 249 self._attenuator = attenuator_controller.AttenuatorController( 250 attenuator_addr) 251 252 def setup(self, include_router=True, include_pcap=True, 253 include_attenuator=True, pcap_as_router=False): 254 """ 255 Construct the state used in a WiFi test. 256 257 @param include_router optional bool which should be False if the router 258 is not used by the test 259 @param include_pcap optional bool which should be False if the pcap 260 device is not used by the test 261 @param include_attenuator optional bool which should be False if the 262 attenuator is not used by the test 263 @param pcap_as_router optional bool which should be True if the pcap 264 should be configured as a router. 265 """ 266 if include_router: 267 self._setup_router() 268 if include_pcap: 269 self._setup_pcap(pcap_as_router) 270 self._pcap_as_router = pcap_as_router 271 if include_attenuator: 272 self._setup_attenuator() 273 274 # Set up a clean context to conduct WiFi tests in. 275 self.client.shill.init_test_network_state() 276 self.client.sync_host_times() 277 278 if self.CMDLINE_CLIENT_PACKET_CAPTURES in self._cmdline_args: 279 self._enable_client_packet_captures = True 280 if self.CMDLINE_PACKET_CAPTURES in self._cmdline_args: 281 self._enable_packet_captures = True 282 if self.CMDLINE_PACKET_CAPTURE_SNAPLEN in self._cmdline_args: 283 self._packet_capture_snaplen = int( 284 self._cmdline_args[self.CMDLINE_PACKET_CAPTURE_SNAPLEN]) 285 self.client.conductive = self._get_bool_cmdline_value( 286 self.CMDLINE_CONDUCTIVE_RIG, None) 287 288 289 def teardown(self): 290 """Teardown the state used in a WiFi test.""" 291 logging.debug('Tearing down the test context.') 292 for system in [self._attenuator, self._client_proxy, 293 self._router, self._pcap_host]: 294 if system is not None: 295 system.close() 296 297 298 def assert_connect_wifi(self, wifi_params, description=None): 299 """Connect to a WiFi network and check for success. 300 301 Connect a DUT to a WiFi network and check that we connect successfully. 302 303 @param wifi_params AssociationParameters describing network to connect. 304 @param description string Additional text for logging messages. 305 306 @returns AssociationResult if successful; None if wifi_params 307 contains expect_failure; asserts otherwise. 308 309 """ 310 if description: 311 connect_name = '%s (%s)' % (wifi_params.ssid, description) 312 else: 313 connect_name = '%s' % wifi_params.ssid 314 logging.info('Connecting to %s.', connect_name) 315 assoc_result = xmlrpc_datatypes.deserialize( 316 self.client.shill.connect_wifi(wifi_params)) 317 logging.info('Finished connection attempt to %s with times: ' 318 'discovery=%.2f, association=%.2f, configuration=%.2f.', 319 connect_name, 320 assoc_result.discovery_time, 321 assoc_result.association_time, 322 assoc_result.configuration_time) 323 324 if assoc_result.success and wifi_params.expect_failure: 325 raise error.TestFail( 326 'Expected connection to %s to fail, but it was successful.' % 327 connect_name) 328 329 if not assoc_result.success and not wifi_params.expect_failure: 330 raise error.TestFail( 331 'Expected connection to %s to succeed, ' 332 'but it failed with reason: %s.' % ( 333 connect_name, assoc_result.failure_reason)) 334 335 if wifi_params.expect_failure: 336 logging.info('Unable to connect to %s, as intended.', 337 connect_name) 338 return None 339 340 logging.info('Connected successfully to %s, signal level: %r.', 341 connect_name, self.client.wifi_signal_level) 342 return assoc_result 343 344 345 def assert_ping_from_dut(self, ping_config=None, ap_num=None): 346 """Ping a host on the WiFi network from the DUT. 347 348 Ping a host reachable on the WiFi network from the DUT, and 349 check that the ping is successful. The host we ping depends 350 on the test setup, sometimes that host may be the server and 351 sometimes it will be the router itself. Ping-ability may be 352 used to confirm that a WiFi network is operating correctly. 353 354 @param ping_config optional PingConfig object to override defaults. 355 @param ap_num int which AP to ping if more than one is configured. 356 357 """ 358 if ap_num is None: 359 ap_num = 0 360 if ping_config is None: 361 ping_ip = self.router.get_wifi_ip(ap_num=ap_num) 362 wifi_if = self.client.wifi_if 363 ping_config = ping_runner.PingConfig(ping_ip, source_iface=wifi_if) 364 self.client.ping(ping_config) 365 366 367 def assert_ping_from_server(self, ping_config=None): 368 """Ping the DUT across the WiFi network from the server. 369 370 Check that the ping is mostly successful and fail the test if it 371 is not. 372 373 @param ping_config optional PingConfig object to override defaults. 374 375 """ 376 logging.info('Pinging from server.') 377 if ping_config is None: 378 ping_ip = self.client.wifi_ip 379 ping_config = ping_runner.PingConfig(ping_ip) 380 self.router.ping(ping_config) 381 382 383 def wait_for_connection(self, ssid, freq=None, ap_num=None, 384 timeout_seconds=30): 385 """Verifies a connection to network ssid on frequency freq. 386 387 @param ssid string ssid of the network to check. 388 @param freq int frequency of network to check. 389 @param ap_num int AP to which to connect 390 @param timeout_seconds int number of seconds to wait for 391 connection on the given frequency. 392 393 @returns a named tuple of (state, time) 394 """ 395 if ap_num is None: 396 ap_num = 0 397 desired_subnet = self.router.get_wifi_ip_subnet(ap_num) 398 wifi_ip = self.router.get_wifi_ip(ap_num) 399 wifi_if = self.client.wifi_if 400 401 # ping command used for verifying WiFi connection should bind to 402 # WiFi interface, so that ping packets won't go through Ethernet 403 # interface in some scenarios. 404 # See b/199940334: autotest: wifi_client's wait_for_connection 405 # does not consider NUD state 406 return self.client.wait_for_connection(ssid, 407 timeout_seconds=timeout_seconds, 408 freq=freq, 409 ping_ip=wifi_ip, 410 desired_subnet=desired_subnet, 411 source_iface=wifi_if) 412