1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import contextlib 6import logging 7import math 8import re 9import time 10 11from contextlib import contextmanager 12from collections import namedtuple 13 14from autotest_lib.client.common_lib import error 15from autotest_lib.client.common_lib import utils 16from autotest_lib.client.common_lib.cros.network import interface 17from autotest_lib.client.common_lib.cros.network import iw_runner 18from autotest_lib.client.common_lib.cros.network import ping_runner 19from autotest_lib.client.cros import constants 20from autotest_lib.server import autotest 21from autotest_lib.server import site_linux_system 22from autotest_lib.server.cros.network import wpa_cli_proxy 23from autotest_lib.server.cros.network import wpa_mon 24from autotest_lib.server.hosts import cast_os_host 25 26# Wake-on-WiFi feature strings 27WAKE_ON_WIFI_NONE = 'none' 28WAKE_ON_WIFI_PACKET = 'packet' 29WAKE_ON_WIFI_DARKCONNECT = 'darkconnect' 30WAKE_ON_WIFI_PACKET_DARKCONNECT = 'packet_and_darkconnect' 31WAKE_ON_WIFI_NOT_SUPPORTED = 'not_supported' 32 33# Wake-on-WiFi test timing constants 34SUSPEND_WAIT_TIME_SECONDS = 10 35RECEIVE_PACKET_WAIT_TIME_SECONDS = 10 36DARK_RESUME_WAIT_TIME_SECONDS = 25 37WAKE_TO_SCAN_PERIOD_SECONDS = 30 38NET_DETECT_SCAN_WAIT_TIME_SECONDS = 15 39WAIT_UP_TIMEOUT_SECONDS = 10 40DISCONNECT_WAIT_TIME_SECONDS = 10 41INTERFACE_DOWN_WAIT_TIME_SECONDS = 10 42 43ConnectTime = namedtuple('ConnectTime', 'state, time') 44 45XMLRPC_BRINGUP_TIMEOUT_SECONDS = 60 46SHILL_XMLRPC_LOG_PATH = '/var/log/shill_xmlrpc_server.log' 47 48 49def _is_eureka_host(host): 50 return host.get_os_type() == cast_os_host.OS_TYPE_CAST_OS 51 52 53def get_xmlrpc_proxy(host): 54 """Get a shill XMLRPC proxy for |host|. 55 56 The returned object has no particular type. Instead, when you call 57 a method on the object, it marshalls the objects passed as arguments 58 and uses them to make RPCs on the remote server. Thus, you should 59 read shill_xmlrpc_server.py to find out what methods are supported. 60 61 @param host: host object representing a remote device. 62 @return proxy object for remote XMLRPC server. 63 64 """ 65 # Make sure the client library is on the device so that the proxy 66 # code is there when we try to call it. 67 if host.is_client_install_supported: 68 client_at = autotest.Autotest(host) 69 client_at.install() 70 # This is the default port for shill xmlrpc server. 71 server_port = constants.SHILL_XMLRPC_SERVER_PORT 72 xmlrpc_server_command = constants.SHILL_XMLRPC_SERVER_COMMAND 73 log_path = SHILL_XMLRPC_LOG_PATH 74 command_name = constants.SHILL_XMLRPC_SERVER_CLEANUP_PATTERN 75 rpc_server_host = host 76 77 # Start up the XMLRPC proxy on the client 78 proxy = rpc_server_host.rpc_server_tracker.xmlrpc_connect( 79 xmlrpc_server_command, 80 server_port, 81 command_name=command_name, 82 ready_test_name=constants.SHILL_XMLRPC_SERVER_READY_METHOD, 83 timeout_seconds=XMLRPC_BRINGUP_TIMEOUT_SECONDS, 84 logfile=log_path 85 ) 86 return proxy 87 88 89def _is_conductive(host): 90 """Determine if the host is conductive based on AFE labels. 91 92 @param host: A Host object. 93 """ 94 info = host.host_info_store.get() 95 conductive = info.get_label_value('conductive') 96 return conductive.lower() == 'true' 97 98 99class WiFiClient(site_linux_system.LinuxSystem): 100 """WiFiClient is a thin layer of logic over a remote DUT in wifitests.""" 101 102 DEFAULT_PING_COUNT = 10 103 COMMAND_PING = 'ping' 104 105 MAX_SERVICE_GONE_TIMEOUT_SECONDS = 60 106 107 # List of interface names we won't consider for use as "the" WiFi interface 108 # on Android or CastOS hosts. 109 WIFI_IF_BLACKLIST = ['p2p0', 'wfd0'] 110 111 UNKNOWN_BOARD_TYPE = 'unknown' 112 113 # DBus device properties. Wireless interfaces should support these. 114 WAKE_ON_WIFI_FEATURES = 'WakeOnWiFiFeaturesEnabled' 115 NET_DETECT_SCAN_PERIOD = 'NetDetectScanPeriodSeconds' 116 WAKE_TO_SCAN_PERIOD = 'WakeToScanPeriodSeconds' 117 FORCE_WAKE_TO_SCAN_TIMER = 'ForceWakeToScanTimer' 118 MAC_ADDRESS_RANDOMIZATION_SUPPORTED = 'MACAddressRandomizationSupported' 119 MAC_ADDRESS_RANDOMIZATION_ENABLED = 'MACAddressRandomizationEnabled' 120 121 CONNECTED_STATES = ['portal', 'no-connectivity', 'redirect-found', 122 'portal-suspected', 'online', 'ready'] 123 124 @property 125 def machine_id(self): 126 """@return string unique to a particular board/cpu configuration.""" 127 if self._machine_id: 128 return self._machine_id 129 130 uname_result = self.host.run('uname -m', ignore_status=True) 131 kernel_arch = '' 132 if not uname_result.exit_status and uname_result.stdout.find(' ') < 0: 133 kernel_arch = uname_result.stdout.strip() 134 cpu_info = self.host.run('cat /proc/cpuinfo').stdout.splitlines() 135 cpu_count = len(filter(lambda x: x.lower().startswith('bogomips'), 136 cpu_info)) 137 cpu_count_str = '' 138 if cpu_count: 139 cpu_count_str = 'x%d' % cpu_count 140 ghz_value = '' 141 ghz_pattern = re.compile('([0-9.]+GHz)') 142 for line in cpu_info: 143 match = ghz_pattern.search(line) 144 if match is not None: 145 ghz_value = '_' + match.group(1) 146 break 147 148 return '%s_%s%s%s' % (self.board, kernel_arch, ghz_value, cpu_count_str) 149 150 151 @property 152 def powersave_on(self): 153 """@return bool True iff WiFi powersave mode is enabled.""" 154 result = self.host.run("iw dev %s get power_save" % self.wifi_if) 155 output = result.stdout.rstrip() # NB: chop \n 156 # Output should be either "Power save: on" or "Power save: off". 157 find_re = re.compile('([^:]+):\s+(\w+)') 158 find_results = find_re.match(output) 159 if not find_results: 160 raise error.TestFail('Failed to find power_save parameter ' 161 'in iw results.') 162 163 return find_results.group(2) == 'on' 164 165 166 @property 167 def shill(self): 168 """@return shill RPCProxy object.""" 169 return self._shill_proxy 170 171 172 @property 173 def client(self): 174 """Deprecated accessor for the client host. 175 176 The term client is used very loosely in old autotests and this 177 accessor should not be used in new code. Use host() instead. 178 179 @return host object representing a remote DUT. 180 181 """ 182 return self.host 183 184 185 @property 186 def command_ip(self): 187 """@return string path to ip command.""" 188 return self._command_ip 189 190 191 @property 192 def command_iptables(self): 193 """@return string path to iptables command.""" 194 return self._command_iptables 195 196 197 @property 198 def command_ping6(self): 199 """@return string path to ping6 command.""" 200 return self._command_ping6 201 202 203 @property 204 def command_wpa_cli(self): 205 """@return string path to wpa_cli command.""" 206 return self._command_wpa_cli 207 208 209 @property 210 def conductive(self): 211 """@return True if the rig is conductive; False otherwise.""" 212 if self._conductive is None: 213 self._conductive = _is_conductive(self.host) 214 return self._conductive 215 216 217 @conductive.setter 218 def conductive(self, value): 219 """Set the conductive member to True or False. 220 221 @param value: boolean value to set the conductive member to. 222 """ 223 self._conductive = value 224 225 226 @property 227 def module_name(self): 228 """@return Name of kernel module in use by this interface.""" 229 return self._interface.module_name 230 231 @property 232 def parent_device_name(self): 233 """ 234 @return Path of the parent device for the net device""" 235 return self._interface.parent_device_name 236 237 @property 238 def wifi_if(self): 239 """@return string wifi device on machine (e.g. mlan0).""" 240 return self._wifi_if 241 242 243 @property 244 def wifi_mac(self): 245 """@return string MAC address of self.wifi_if.""" 246 return self._interface.mac_address 247 248 249 @property 250 def wifi_ip(self): 251 """@return string IPv4 address of self.wifi_if.""" 252 return self._interface.ipv4_address 253 254 255 @property 256 def wifi_ip_subnet(self): 257 """@return string IPv4 subnet prefix of self.wifi_if.""" 258 return self._interface.ipv4_subnet 259 260 261 @property 262 def wifi_phy_name(self): 263 """@return wiphy name (e.g., 'phy0') or None""" 264 return self._interface.wiphy_name 265 266 @property 267 def wifi_signal_level(self): 268 """Returns the signal level of this DUT's WiFi interface. 269 270 @return int signal level of connected WiFi interface or None (e.g. -67). 271 272 """ 273 return self._interface.signal_level 274 275 @property 276 def wifi_signal_level_all_chains(self): 277 """Returns the signal level of all chains of this DUT's WiFi interface. 278 279 @return int array signal level of each chain of connected WiFi interface 280 or None (e.g. [-67, -60]). 281 282 """ 283 return self._interface.signal_level_all_chains 284 285 @staticmethod 286 def assert_bsses_include_ssids(found_bsses, expected_ssids): 287 """Verifies that |found_bsses| includes |expected_ssids|. 288 289 @param found_bsses list of IwBss objects. 290 @param expected_ssids list of string SSIDs. 291 @raise error.TestFail if any element of |expected_ssids| is not found. 292 293 """ 294 for ssid in expected_ssids: 295 if not ssid: 296 continue 297 298 for bss in found_bsses: 299 if bss.ssid == ssid: 300 break 301 else: 302 raise error.TestFail('SSID %s is not in scan results: %r' % 303 (ssid, found_bsses)) 304 305 306 def wifi_noise_level(self, frequency_mhz): 307 """Returns the noise level of this DUT's WiFi interface. 308 309 @param frequency_mhz: frequency at which the noise level should be 310 measured and reported. 311 @return int signal level of connected WiFi interface in dBm (e.g. -67) 312 or None if the value is unavailable. 313 314 """ 315 return self._interface.noise_level(frequency_mhz) 316 317 318 def __init__(self, client_host, result_dir, use_wpa_cli): 319 """ 320 Construct a WiFiClient. 321 322 @param client_host host object representing a remote host. 323 @param result_dir string directory to store test logs/packet caps. 324 @param use_wpa_cli bool True if we want to use |wpa_cli| commands for 325 Android testing. 326 327 """ 328 super(WiFiClient, self).__init__(client_host, 'client', 329 inherit_interfaces=True) 330 self._command_ip = 'ip' 331 self._command_iptables = 'iptables' 332 self._command_ping6 = 'ping6' 333 self._command_wpa_cli = 'wpa_cli' 334 self._machine_id = None 335 self._result_dir = result_dir 336 self._conductive = None 337 338 if _is_eureka_host(self.host) and use_wpa_cli: 339 # Look up the WiFi device (and its MAC) on the client. 340 devs = self.iw_runner.list_interfaces(desired_if_type='managed') 341 devs = [dev for dev in devs 342 if dev.if_name not in self.WIFI_IF_BLACKLIST] 343 if not devs: 344 raise error.TestFail('No wlan devices found on %s.' % 345 self.host.hostname) 346 347 if len(devs) > 1: 348 logging.warning('Warning, found multiple WiFi devices on ' 349 '%s: %r', self.host.hostname, devs) 350 self._wifi_if = devs[0].if_name 351 self._shill_proxy = wpa_cli_proxy.WpaCliProxy( 352 self.host, self._wifi_if) 353 self._wpa_cli_proxy = self._shill_proxy 354 else: 355 self._shill_proxy = get_xmlrpc_proxy(self.host) 356 interfaces = self._shill_proxy.list_controlled_wifi_interfaces() 357 if not interfaces: 358 logging.debug('No interfaces managed by shill. Rebooting host') 359 self.host.reboot() 360 raise error.TestError('No interfaces managed by shill on %s' % 361 self.host.hostname) 362 self._wifi_if = interfaces[0] 363 self._wpa_cli_proxy = wpa_cli_proxy.WpaCliProxy( 364 self.host, self._wifi_if) 365 self._raise_logging_level() 366 self._interface = interface.Interface(self._wifi_if, host=self.host) 367 self._wpa_mon = wpa_mon.WpaMon(self.host, self.wifi_if) 368 logging.debug('WiFi interface is: %r', 369 self._interface.device_description) 370 self._firewall_rules = [] 371 # All tests that use this object assume the interface starts enabled. 372 self.set_device_enabled(self._wifi_if, True) 373 # Turn off powersave mode by default. 374 self.powersave_switch(False) 375 # Invoke the |capabilities| property defined in the parent |Linuxsystem| 376 # to workaround the lazy loading of the capabilities cache and supported 377 # frequency list. This is needed for tests that may need access to these 378 # when the DUT is unreachable (for ex: suspended). 379 #pylint: disable=pointless-statement 380 self.capabilities 381 382 383 def _assert_method_supported(self, method_name): 384 """Raise a TestNAError if the XMLRPC proxy has no method |method_name|. 385 386 @param method_name: string name of method that should exist on the 387 XMLRPC proxy. 388 389 """ 390 if not self._supports_method(method_name): 391 raise error.TestNAError('%s() is not supported' % method_name) 392 393 394 def _raise_logging_level(self): 395 """Raises logging levels for WiFi on DUT.""" 396 self.host.run('ff_debug --level -2', ignore_status=True) 397 self.host.run('ff_debug +wifi', ignore_status=True) 398 399 400 def is_vht_supported(self): 401 """Returns True if VHT supported; False otherwise""" 402 return self.CAPABILITY_VHT in self.capabilities 403 404 405 def is_5ghz_supported(self): 406 """Returns True if 5Ghz bands are supported; False otherwise.""" 407 return self.CAPABILITY_5GHZ in self.capabilities 408 409 410 def is_ibss_supported(self): 411 """Returns True if IBSS mode is supported; False otherwise.""" 412 return self.CAPABILITY_IBSS in self.capabilities 413 414 415 def is_frequency_supported(self, frequency): 416 """Returns True if the given frequency is supported; False otherwise. 417 418 @param frequency: int Wifi frequency to check if it is supported by 419 DUT. 420 """ 421 return frequency in self.phys_for_frequency 422 423 424 def _supports_method(self, method_name): 425 """Checks if |method_name| is supported on the remote XMLRPC proxy. 426 427 autotest will, for their own reasons, install python files in the 428 autotest client package that correspond the version of the build 429 rather than the version running on the autotest drone. This 430 creates situations where we call methods on the client XMLRPC proxy 431 that don't exist in that version of the code. This detects those 432 situations so that we can degrade more or less gracefully. 433 434 @param method_name: string name of method that should exist on the 435 XMLRPC proxy. 436 @return True if method is available, False otherwise. 437 438 """ 439 supported = (_is_eureka_host(self.host) 440 or method_name in self._shill_proxy.system.listMethods()) 441 if not supported: 442 logging.warning('%s() is not supported on older images', 443 method_name) 444 return supported 445 446 447 def close(self): 448 """Tear down state associated with the client.""" 449 self.stop_capture() 450 self.powersave_switch(False) 451 self.shill.clean_profiles() 452 super(WiFiClient, self).close() 453 454 455 def firewall_open(self, proto, src): 456 """Opens up firewall to run netperf tests. 457 458 By default, we have a firewall rule for NFQUEUE (see crbug.com/220736). 459 In order to run netperf test, we need to add a new firewall rule BEFORE 460 this NFQUEUE rule in the INPUT chain. 461 462 @param proto a string, test traffic protocol, e.g. udp, tcp. 463 @param src a string, subnet/mask. 464 465 @return a string firewall rule added. 466 467 """ 468 rule = 'INPUT -s %s/32 -p %s -m %s -j ACCEPT' % (src, proto, proto) 469 self.host.run('%s -I %s' % (self._command_iptables, rule)) 470 self._firewall_rules.append(rule) 471 return rule 472 473 474 def firewall_cleanup(self): 475 """Cleans up firewall rules.""" 476 for rule in self._firewall_rules: 477 self.host.run('%s -D %s' % (self._command_iptables, rule)) 478 self._firewall_rules = [] 479 480 481 def sync_host_times(self): 482 """Set time on our DUT to match local time.""" 483 epoch_seconds = time.time() 484 self.shill.sync_time_to(epoch_seconds) 485 486 487 def collect_debug_info(self, local_save_dir_prefix): 488 """Collect any debug information needed from the DUT 489 490 This invokes the |collect_debug_info| RPC method to trigger 491 bugreport/logcat collection and then transfers the logs to the 492 server. 493 494 @param local_save_dir_prefix Used as a prefix for local save directory. 495 """ 496 pass 497 498 499 def check_iw_link_value(self, iw_link_key, desired_value): 500 """Assert that the current wireless link property is |desired_value|. 501 502 @param iw_link_key string one of IW_LINK_KEY_* defined in iw_runner. 503 @param desired_value string desired value of iw link property. 504 505 """ 506 actual_value = self.get_iw_link_value(iw_link_key) 507 desired_value = str(desired_value) 508 if actual_value != desired_value: 509 raise error.TestFail('Wanted iw link property %s value %s, but ' 510 'got %s instead.' % (iw_link_key, 511 desired_value, 512 actual_value)) 513 514 515 def get_iw_link_value(self, iw_link_key): 516 """Get the current value of a link property for this WiFi interface. 517 518 @param iw_link_key string one of IW_LINK_KEY_* defined in iw_runner. 519 520 """ 521 return self.iw_runner.get_link_value(self.wifi_if, iw_link_key) 522 523 524 def powersave_switch(self, turn_on): 525 """Toggle powersave mode for the DUT. 526 527 @param turn_on bool True iff powersave mode should be turned on. 528 529 """ 530 mode = 'off' 531 if turn_on: 532 mode = 'on' 533 # Turn ON interface and set power_save option. 534 self.host.run('ifconfig %s up' % self.wifi_if) 535 self.host.run('iw dev %s set power_save %s' % (self.wifi_if, mode)) 536 537 538 def timed_scan(self, frequencies, ssids, scan_timeout_seconds=10, 539 retry_timeout_seconds=10): 540 """Request timed scan to discover given SSIDs. 541 542 This method will retry for a default of |retry_timeout_seconds| until it 543 is able to successfully kick off a scan. Sometimes, the device on the 544 DUT claims to be busy and rejects our requests. It will raise error 545 if the scan did not complete within |scan_timeout_seconds| or it was 546 not able to discover the given SSIDs. 547 548 @param frequencies list of int WiFi frequencies to scan for. 549 @param ssids list of string ssids to probe request for. 550 @param scan_timeout_seconds: float number of seconds the scan 551 operation not to exceed. 552 @param retry_timeout_seconds: float number of seconds to retry scanning 553 if the interface is busy. This does not retry if certain 554 SSIDs are missing from the results. 555 @return time in seconds took to complete scan request. 556 557 """ 558 # the poll method returns the result of the func 559 scan_result = utils.poll_for_condition( 560 condition=lambda: self.iw_runner.timed_scan( 561 self.wifi_if, 562 frequencies=frequencies, 563 ssids=ssids), 564 exception=error.TestFail('Unable to trigger scan on client'), 565 timeout=retry_timeout_seconds, 566 sleep_interval=0.5) 567 # Verify scan operation completed within given timeout 568 if scan_result.time > scan_timeout_seconds: 569 raise error.TestFail('Scan time %.2fs exceeds the scan timeout' % 570 (scan_result.time)) 571 572 # Verify all ssids are discovered 573 self.assert_bsses_include_ssids(scan_result.bss_list, ssids) 574 575 logging.info('Wifi scan completed in %.2f seconds', scan_result.time) 576 return scan_result.time 577 578 579 def scan(self, frequencies, ssids, timeout_seconds=10, require_match=True): 580 """Request a scan and (optionally) check that requested SSIDs appear in 581 the results. 582 583 This method will retry for a default of |timeout_seconds| until it is 584 able to successfully kick off a scan. Sometimes, the device on the DUT 585 claims to be busy and rejects our requests. 586 587 If |ssids| is non-empty, we will speficially probe for those SSIDs. 588 589 If |require_match| is True, we will verify that every element 590 of |ssids| was found in scan results. 591 592 @param frequencies list of int WiFi frequencies to scan for. 593 @param ssids list of string ssids to probe request for. 594 @param timeout_seconds: float number of seconds to retry scanning 595 if the interface is busy. This does not retry if certain 596 SSIDs are missing from the results. 597 @param require_match: bool True if we must find |ssids|. 598 599 """ 600 bss_list = utils.poll_for_condition( 601 condition=lambda: self.iw_runner.scan( 602 self.wifi_if, 603 frequencies=frequencies, 604 ssids=ssids), 605 exception=error.TestFail('Unable to trigger scan on client'), 606 timeout=timeout_seconds, 607 sleep_interval=0.5) 608 609 if require_match: 610 self.assert_bsses_include_ssids(bss_list, ssids) 611 612 613 def wait_for_bss(self, bssid, timeout_seconds=15): 614 """Wait for a specific BSS to appear in the scan results. 615 616 @param bssid: string bssid of AP we expect to see in scan results 617 @param timeout_seconds int seconds to wait for BSSes to be discovered 618 619 """ 620 def dut_sees_bss(): 621 """Check if a DUT can see a BSS in scan results. 622 623 @return True iff scan results from DUT include the specified BSS. 624 625 """ 626 is_requested_bss = lambda iw_bss: iw_bss.bss == bssid 627 scan_results = self.iw_runner.scan(self.wifi_if) 628 return scan_results and filter(is_requested_bss, scan_results) 629 try: 630 utils.poll_for_condition( 631 condition=dut_sees_bss, 632 timeout=timeout_seconds, 633 sleep_interval=0.5) 634 except: 635 raise error.TestFail('Failed to discover BSS %s' % bssid) 636 637 638 def wait_for_bsses(self, ssid, num_bss_expected, timeout_seconds=15): 639 """Wait for all BSSes associated with given SSID to be discovered in the 640 scan. 641 642 @param ssid string name of network being queried 643 @param num_bss_expected int number of BSSes expected 644 @param timeout_seconds int seconds to wait for BSSes to be discovered 645 646 """ 647 # If the scan returns None, return 0, else return the matching count 648 649 # Wrap num_bss_actual as a mutable object, list, so that an inner function 650 # can update the value without making an assignment to it. Without any 651 # assignment, the inner function will look for the variable in outer scope 652 # instead of creating a new local one. 653 num_bss_actual = [0] 654 def are_all_bsses_discovered(): 655 """Determine if all BSSes associated with the SSID from parent 656 function are discovered in the scan 657 658 @return boolean representing whether the expected bss count matches 659 how many in the scan match the given ssid 660 """ 661 self.claim_wifi_if() # Stop shill/supplicant scans 662 try: 663 scan_results = self.iw_runner.scan( 664 self.wifi_if, 665 frequencies=[], 666 ssids=[ssid]) 667 if scan_results is None: 668 return False 669 num_bss_actual[0] = sum(ssid == bss.ssid for bss in scan_results) 670 return num_bss_expected == num_bss_actual[0] 671 finally: 672 self.release_wifi_if() 673 try: 674 utils.poll_for_condition( 675 condition=are_all_bsses_discovered, 676 timeout=timeout_seconds, 677 sleep_interval=0.5) 678 except utils.TimeoutError: 679 raise error.TestFail('Failed to discover all BSSes. Found %d,' 680 ' wanted %d with SSID %s' % 681 (num_bss_actual[0], num_bss_expected, ssid)) 682 683 def wait_for_service_states(self, ssid, states, timeout_seconds): 684 """Waits for a WiFi service to achieve one of |states|. 685 686 @param ssid string name of network being queried 687 @param states tuple list of states for which the caller is waiting 688 @param timeout_seconds int seconds to wait for a state in |states| 689 690 """ 691 logging.info('Waiting for %s to reach one of %r...', ssid, states) 692 success, state, duration = self._shill_proxy.wait_for_service_states( 693 ssid, states, timeout_seconds) 694 logging.info('...ended up in state \'%s\' (%s) after %f seconds.', 695 state, 'success' if success else 'failure', duration) 696 return success, state, duration 697 698 699 def do_suspend(self, seconds): 700 """Puts the DUT in suspend power state for |seconds| seconds. 701 702 @param seconds: The number of seconds to suspend the device. 703 704 """ 705 logging.info('Suspending DUT for %d seconds...', seconds) 706 self._shill_proxy.do_suspend(seconds) 707 logging.info('...done suspending') 708 709 710 def do_suspend_bg(self, seconds): 711 """Suspend DUT using the power manager - non-blocking. 712 713 @param seconds: The number of seconds to suspend the device. 714 715 """ 716 logging.info('Suspending DUT (in background) for %d seconds...', 717 seconds) 718 self._shill_proxy.do_suspend_bg(seconds) 719 720 721 def flush_bss(self, age=0): 722 """Flush supplicant's cached BSS on the DUT. 723 724 @param age: BSS older than |age| seconds will be removed from the cache. 725 """ 726 result = self._wpa_cli_proxy.run_wpa_cli_cmd('bss_flush %d' % age, 727 check_result=False); 728 logging.info('wpa_cli bss_flush %d: out:%r err:%r', age, result.stdout, 729 result.stderr) 730 return result.stdout, result.stderr 731 732 733 def clear_supplicant_blacklist(self): 734 """Clear's the AP blacklist on the DUT. 735 736 @return stdout and stderror returns passed from wpa_cli command. 737 738 """ 739 result = self._wpa_cli_proxy.run_wpa_cli_cmd('blacklist clear', 740 check_result=False); 741 logging.info('wpa_cli blacklist clear: out:%r err:%r', result.stdout, 742 result.stderr) 743 return result.stdout, result.stderr 744 745 746 def get_active_wifi_SSIDs(self): 747 """Get a list of visible SSID's around the DUT 748 749 @return list of string SSIDs 750 751 """ 752 self._assert_method_supported('get_active_wifi_SSIDs') 753 return self._shill_proxy.get_active_wifi_SSIDs() 754 755 756 def set_device_enabled(self, wifi_interface, value, 757 fail_on_unsupported=False): 758 """Enable or disable the WiFi device. 759 760 @param wifi_interface: string name of interface being modified. 761 @param enabled: boolean; true if this device should be enabled, 762 false if this device should be disabled. 763 @return True if it worked; False, otherwise. 764 765 """ 766 if fail_on_unsupported: 767 self._assert_method_supported('set_device_enabled') 768 elif not self._supports_method('set_device_enabled'): 769 return False 770 return self._shill_proxy.set_device_enabled(wifi_interface, value) 771 772 773 def add_arp_entry(self, ip_address, mac_address): 774 """Add an ARP entry to the table associated with the WiFi interface. 775 776 @param ip_address: string IP address associated with the new ARP entry. 777 @param mac_address: string MAC address associated with the new ARP 778 entry. 779 780 """ 781 self.host.run('ip neigh add %s lladdr %s dev %s nud perm' % 782 (ip_address, mac_address, self.wifi_if)) 783 784 def add_wake_packet_source(self, source_ip): 785 """Add |source_ip| as a source that can wake us up with packets. 786 787 @param source_ip: IP address from which to wake upon receipt of packets 788 789 @return True if successful, False otherwise. 790 791 """ 792 return self._shill_proxy.add_wake_packet_source( 793 self.wifi_if, source_ip) 794 795 796 def remove_wake_packet_source(self, source_ip): 797 """Remove |source_ip| as a source that can wake us up with packets. 798 799 @param source_ip: IP address to stop waking on packets from 800 801 @return True if successful, False otherwise. 802 803 """ 804 return self._shill_proxy.remove_wake_packet_source( 805 self.wifi_if, source_ip) 806 807 808 def remove_all_wake_packet_sources(self): 809 """Remove all IPs as sources that can wake us up with packets. 810 811 @return True if successful, False otherwise. 812 813 """ 814 return self._shill_proxy.remove_all_wake_packet_sources(self.wifi_if) 815 816 817 def wake_on_wifi_features(self, features): 818 """Shill supports programming the NIC to wake on special kinds of 819 incoming packets, or on changes to the available APs (disconnect, 820 coming in range of a known SSID). This method allows you to configure 821 what wake-on-WiFi mechanisms are active. It returns a context manager, 822 because this is a system-wide setting and we don't want it to persist 823 across different tests. 824 825 If you enable wake-on-packet, then the IPs registered by 826 add_wake_packet_source will be able to wake the system from suspend. 827 828 The correct way to use this method is: 829 830 with client.wake_on_wifi_features(WAKE_ON_WIFI_DARKCONNECT): 831 ... 832 833 @param features: string from the WAKE_ON_WIFI constants above. 834 835 @return a context manager for the features. 836 837 """ 838 return TemporaryDeviceDBusProperty(self._shill_proxy, 839 self.wifi_if, 840 self.WAKE_ON_WIFI_FEATURES, 841 features) 842 843 844 def net_detect_scan_period_seconds(self, period): 845 """Sets the period between net detect scans performed by the NIC to look 846 for allowlisted SSIDs to |period|. This setting only takes effect if the 847 NIC is programmed to wake on SSID. 848 849 The correct way to use this method is: 850 851 with client.net_detect_scan_period_seconds(60): 852 ... 853 854 @param period: integer number of seconds between NIC net detect scans 855 856 @return a context manager for the net detect scan period 857 858 """ 859 return TemporaryDeviceDBusProperty(self._shill_proxy, 860 self.wifi_if, 861 self.NET_DETECT_SCAN_PERIOD, 862 period) 863 864 865 def wake_to_scan_period_seconds(self, period): 866 """Sets the period between RTC timer wakeups where the system is woken 867 from suspend to perform scans. This setting only takes effect if the 868 NIC is programmed to wake on SSID. Use as with 869 net_detect_scan_period_seconds. 870 871 @param period: integer number of seconds between wake to scan RTC timer 872 wakes. 873 874 @return a context manager for the net detect scan period 875 876 """ 877 return TemporaryDeviceDBusProperty(self._shill_proxy, 878 self.wifi_if, 879 self.WAKE_TO_SCAN_PERIOD, 880 period) 881 882 883 def force_wake_to_scan_timer(self, is_forced): 884 """Sets the boolean value determining whether or not to force the use of 885 the wake to scan RTC timer, which wakes the system from suspend 886 periodically to scan for networks. 887 888 @param is_forced: boolean whether or not to force the use of the wake to 889 scan timer 890 891 @return a context manager for the net detect scan period 892 893 """ 894 return TemporaryDeviceDBusProperty(self._shill_proxy, 895 self.wifi_if, 896 self.FORCE_WAKE_TO_SCAN_TIMER, 897 is_forced) 898 899 900 def mac_address_randomization(self, enabled): 901 """Sets the boolean value determining whether or not to enable MAC 902 address randomization. This instructs the NIC to randomize the last 903 three octets of the MAC address used in probe requests while 904 disconnected to make the DUT harder to track. 905 906 If MAC address randomization is not supported on this DUT and the 907 caller tries to turn it on, this raises a TestNAError. 908 909 @param enabled: boolean whether or not to enable MAC address 910 randomization 911 912 @return a context manager for the MAC address randomization property 913 914 """ 915 if not self._shill_proxy.get_dbus_property_on_device( 916 self.wifi_if, self.MAC_ADDRESS_RANDOMIZATION_SUPPORTED): 917 if enabled: 918 raise error.TestNAError( 919 'MAC address randomization not supported') 920 else: 921 # Return a no-op context manager. 922 return contextlib.nested() 923 924 return TemporaryDeviceDBusProperty( 925 self._shill_proxy, 926 self.wifi_if, 927 self.MAC_ADDRESS_RANDOMIZATION_ENABLED, 928 enabled) 929 930 931 def request_roam(self, bssid): 932 """Request that we roam to the specified BSSID. 933 934 Note that this operation assumes that: 935 936 1) We're connected to an SSID for which |bssid| is a member. 937 2) There is a BSS with an appropriate ID in our scan results. 938 939 This method does not check for success of either the command or 940 the roaming operation. 941 942 @param bssid: string MAC address of bss to roam to. 943 944 """ 945 self._wpa_cli_proxy.run_wpa_cli_cmd('roam %s' % bssid, 946 check_result=False); 947 return True 948 949 950 def request_roam_dbus(self, bssid, iface): 951 """Request that we roam to the specified BSSID through dbus. 952 953 Note that this operation assumes that: 954 955 1) We're connected to an SSID for which |bssid| is a member. 956 2) There is a BSS with an appropriate ID in our scan results. 957 958 @param bssid: string MAC address of bss to roam to. 959 @param iface: interface to use 960 @return True if the roam was initiated successfully. Note that this 961 does not guarantee the roam completed successfully. 962 963 """ 964 self._assert_method_supported('request_roam_dbus') 965 return self._shill_proxy.request_roam_dbus(bssid, iface) 966 967 968 def wait_for_roam(self, bssid, timeout_seconds=10.0): 969 """Wait for a roam to the given |bssid|. 970 971 @param bssid: string bssid to expect a roam to 972 (e.g. '00:11:22:33:44:55'). 973 @param timeout_seconds: float number of seconds to wait for a roam. 974 @return True if we detect an association to the given |bssid| within 975 |timeout_seconds|. 976 977 """ 978 def attempt_roam(): 979 """Perform a roam to the given |bssid| 980 981 @return True if there is an assocation between the given |bssid| 982 and that association is detected within the timeout frame 983 """ 984 current_bssid = self.iw_runner.get_current_bssid(self.wifi_if) 985 logging.debug('Current BSSID is %s.', current_bssid) 986 return current_bssid == bssid 987 988 start_time = time.time() 989 duration = 0.0 990 try: 991 success = utils.poll_for_condition( 992 condition=attempt_roam, 993 timeout=timeout_seconds, 994 sleep_interval=0.5, 995 desc='Wait for a roam to the given bssid') 996 # wait_for_roam should return False on timeout 997 except utils.TimeoutError: 998 success = False 999 1000 duration = time.time() - start_time 1001 logging.debug('%s to %s in %f seconds.', 1002 'Roamed ' if success else 'Failed to roam ', 1003 bssid, 1004 duration) 1005 return success 1006 1007 1008 def wait_for_ssid_vanish(self, ssid): 1009 """Wait for shill to notice that there are no BSS's for an SSID present. 1010 1011 Raise a test failing exception if this does not come to pass. 1012 1013 @param ssid: string SSID of the network to require be missing. 1014 1015 """ 1016 def is_missing_yet(): 1017 """Determine if the ssid is removed from the service list yet 1018 1019 @return True if the ssid is not found in the service list 1020 """ 1021 visible_ssids = self.get_active_wifi_SSIDs() 1022 logging.info('Got service list: %r', visible_ssids) 1023 if ssid not in visible_ssids: 1024 return True 1025 self.scan(frequencies=[], ssids=[], timeout_seconds=30) 1026 1027 utils.poll_for_condition( 1028 condition=is_missing_yet, # func 1029 exception=error.TestFail('shill should mark BSS not present'), 1030 timeout=self.MAX_SERVICE_GONE_TIMEOUT_SECONDS, 1031 sleep_interval=0) 1032 1033 def reassociate(self, timeout_seconds=10): 1034 """Reassociate to the connected network. 1035 1036 @param timeout_seconds: float number of seconds to wait for operation 1037 to complete. 1038 1039 """ 1040 logging.info('Attempt to reassociate') 1041 with self.iw_runner.get_event_logger() as logger: 1042 logger.start() 1043 # Issue reattach command to wpa_supplicant 1044 self._wpa_cli_proxy.run_wpa_cli_cmd('reattach'); 1045 1046 # Wait for the timeout seconds for association to complete 1047 time.sleep(timeout_seconds) 1048 1049 # Stop iw event logger 1050 logger.stop() 1051 1052 # Get association time based on the iw event log 1053 reassociate_time = logger.get_reassociation_time() 1054 if reassociate_time is None or reassociate_time > timeout_seconds: 1055 raise error.TestFail( 1056 'Failed to reassociate within given timeout') 1057 logging.info('Reassociate time: %.2f seconds', reassociate_time) 1058 1059 1060 def wait_for_connection(self, ssid, timeout_seconds=30, freq=None, 1061 ping_ip=None, desired_subnet=None): 1062 """Verifies a connection to network ssid, optionally verifying 1063 frequency, ping connectivity and subnet. 1064 1065 @param ssid string ssid of the network to check. 1066 @param timeout_seconds int number of seconds to wait for 1067 connection on the given frequency. 1068 @param freq int frequency of network to check. 1069 @param ping_ip string ip address to ping for verification. 1070 @param desired_subnet string expected subnet in which client 1071 ip address should reside. 1072 1073 @returns a named tuple of (state, time) 1074 """ 1075 start_time = time.time() 1076 duration = lambda: time.time() - start_time 1077 1078 # Wrap state as a mutable object, list, so that an inner function can 1079 # update the value without making an assignment to it. Without any 1080 # assignment, the inner function will look for the variable in outer 1081 # scope instead of creating a new local one. 1082 state = [None] 1083 def verify_connection(): 1084 """Verify the connection and perform optional operations 1085 as defined in the parent function 1086 1087 @return False if there is a failure in the connection or 1088 the prescribed verification steps 1089 The named tuple ConnectTime otherwise, with the state 1090 and connection time from waiting for service states 1091 """ 1092 success, state[0], conn_time = self.wait_for_service_states( 1093 ssid, 1094 self.CONNECTED_STATES, 1095 int(math.ceil(timeout_seconds - duration()))) 1096 if not success: 1097 return False 1098 1099 if freq: 1100 actual_freq = self.get_iw_link_value( 1101 iw_runner.IW_LINK_KEY_FREQUENCY) 1102 if str(freq) != actual_freq: 1103 logging.debug( 1104 'Waiting for desired frequency %s (got %s).', 1105 freq, 1106 actual_freq) 1107 return False 1108 1109 if desired_subnet: 1110 actual_subnet = self.wifi_ip_subnet 1111 if actual_subnet != desired_subnet: 1112 logging.debug( 1113 'Waiting for desired subnet %s (got %s).', 1114 desired_subnet, 1115 actual_subnet) 1116 return False 1117 1118 if ping_ip: 1119 ping_config = ping_runner.PingConfig(ping_ip) 1120 self.ping(ping_config) 1121 1122 return ConnectTime(state[0], conn_time) 1123 1124 freq_error_str = (' on frequency %d Mhz' % freq) if freq else '' 1125 1126 try: 1127 ret = utils.poll_for_condition( 1128 condition=verify_connection, 1129 timeout=timeout_seconds, 1130 sleep_interval=0) 1131 except utils.TimeoutError: 1132 raise error.TestFail( 1133 'Failed to connect to "%s"%s in %f seconds (state=%s)' % 1134 (ssid, 1135 freq_error_str, 1136 duration(), 1137 state[0])) 1138 return ret 1139 1140 @contextmanager 1141 def assert_disconnect_count(self, count): 1142 """Context asserting |count| disconnects for the context lifetime. 1143 1144 Creates an iw logger during the lifetime of the context and asserts 1145 that the client disconnects exactly |count| times. 1146 1147 @param count int the expected number of disconnections. 1148 1149 """ 1150 with self.iw_runner.get_event_logger() as logger: 1151 logger.start() 1152 yield 1153 logger.stop() 1154 if logger.get_disconnect_count() != count: 1155 raise error.TestFail( 1156 'Client disconnected %d times; expected %d' % 1157 (logger.get_disconnect_count(), count)) 1158 1159 1160 def assert_no_disconnects(self): 1161 """Context asserting no disconnects for the context lifetime.""" 1162 return self.assert_disconnect_count(0) 1163 1164 1165 @contextmanager 1166 def assert_disconnect_event(self): 1167 """Context asserting at least one disconnect for the context lifetime. 1168 1169 Creates an iw logger during the lifetime of the context and asserts 1170 that the client disconnects at least one time. 1171 1172 """ 1173 with self.iw_runner.get_event_logger() as logger: 1174 logger.start() 1175 yield 1176 logger.stop() 1177 if logger.get_disconnect_count() == 0: 1178 raise error.TestFail('Client did not disconnect') 1179 1180 1181 def get_num_card_resets(self): 1182 """Get card reset count.""" 1183 reset_msg = '[m]wifiex_sdio_card_reset' 1184 result = self.host.run('grep -c %s /var/log/messages' % reset_msg, 1185 ignore_status=True) 1186 if result.exit_status == 1: 1187 return 0 1188 count = int(result.stdout.strip()) 1189 return count 1190 1191 1192 def get_disconnect_reasons(self): 1193 """Get disconnect reason codes.""" 1194 disconnect_reason_msg = "updated DisconnectReason " 1195 disconnect_reason_cleared = "clearing DisconnectReason for " 1196 result = self.host.run('grep -a -E "(%s|%s)" /var/log/net.log' % 1197 (disconnect_reason_msg, 1198 disconnect_reason_cleared), 1199 ignore_status=True) 1200 if result.exit_status == 1: 1201 return None 1202 1203 lines = result.stdout.strip().split('\n') 1204 disconnect_reasons = [] 1205 disconnect_reason_regex = re.compile(' to (\D?\d+)') 1206 1207 found = False 1208 for line in reversed(lines): 1209 match = disconnect_reason_regex.search(line) 1210 if match is not None: 1211 disconnect_reasons.append(match.group(1)) 1212 found = True 1213 else: 1214 if (found): 1215 break 1216 return list(reversed(disconnect_reasons)) 1217 1218 1219 def release_wifi_if(self): 1220 """Release the control over the wifi interface back to normal operation. 1221 1222 This will give the ownership of the wifi interface back to shill and 1223 wpa_supplicant. 1224 1225 """ 1226 self.set_device_enabled(self._wifi_if, True) 1227 1228 1229 def claim_wifi_if(self): 1230 """Claim the control over the wifi interface from this wifi client. 1231 1232 This claim the ownership of the wifi interface from shill and 1233 wpa_supplicant. The wifi interface should be UP when this call returns. 1234 1235 """ 1236 # Disabling a wifi device in shill will remove that device from 1237 # wpa_supplicant as well. 1238 self.set_device_enabled(self._wifi_if, False) 1239 1240 # Wait for shill to bring down the wifi interface. 1241 is_interface_down = lambda: not self._interface.is_up 1242 utils.poll_for_condition( 1243 is_interface_down, 1244 timeout=INTERFACE_DOWN_WAIT_TIME_SECONDS, 1245 sleep_interval=0.5, 1246 desc='Timeout waiting for interface to go down.') 1247 # Bring up the wifi interface to allow the test to use the interface. 1248 self.host.run('%s link set %s up' % (self.cmd_ip, self.wifi_if)) 1249 1250 1251 def set_sched_scan(self, enable, fail_on_unsupported=False): 1252 """enable/disable scheduled scan. 1253 1254 @param enable bool flag indicating to enable/disable scheduled scan. 1255 1256 """ 1257 if fail_on_unsupported: 1258 self._assert_method_supported('set_sched_scan') 1259 elif not self._supports_method('set_sched_scan'): 1260 return False 1261 return self._shill_proxy.set_sched_scan(enable) 1262 1263 1264 def check_connected_on_last_resume(self): 1265 """Checks whether the DUT was connected on its last resume. 1266 1267 Checks that the DUT was connected after waking from suspend by parsing 1268 the last instance shill log message that reports shill's connection 1269 status on resume. Fails the test if this log message reports that 1270 the DUT woke up disconnected. 1271 1272 """ 1273 # As of build R43 6913.0.0, the shill log message from the function 1274 # OnAfterResume is called as soon as shill resumes from suspend, and 1275 # will report whether or not shill is connected. The log message will 1276 # take one of the following two forms: 1277 # 1278 # [...] [INFO:wifi.cc(1941)] OnAfterResume: connected 1279 # [...] [INFO:wifi.cc(1941)] OnAfterResume: not connected 1280 # 1281 # where 1941 is an arbitrary PID number. By checking if the last 1282 # instance of this message contains the substring "not connected", we 1283 # can determine whether or not shill was connected on its last resume. 1284 connection_status_log_regex_str = 'INFO:wifi\.cc.*OnAfterResume' 1285 not_connected_substr = 'not connected' 1286 connected_substr = 'connected' 1287 1288 cmd = ('grep -E %s /var/log/net.log | tail -1' % 1289 connection_status_log_regex_str) 1290 connection_status_log = self.host.run(cmd).stdout 1291 if not connection_status_log: 1292 raise error.TestFail('Could not find resume connection status log ' 1293 'message.') 1294 1295 logging.debug('Connection status message:\n%s', connection_status_log) 1296 if not_connected_substr in connection_status_log: 1297 raise error.TestFail('Client was not connected upon waking from ' 1298 'suspend.') 1299 1300 if not connected_substr in connection_status_log: 1301 raise error.TestFail('Last resume log message did not contain ' 1302 'connection status.') 1303 1304 logging.info('Client was connected upon waking from suspend.') 1305 1306 1307 def check_wake_on_wifi_throttled(self): 1308 """ 1309 Checks whether wake on WiFi was throttled on the DUT on the last dark 1310 resume. Check for this by parsing shill logs for a throttling message. 1311 1312 """ 1313 # We are looking for an dark resume error log message indicating that 1314 # wake on WiFi was throttled. This is an example of the error message: 1315 # [...] [ERROR:wake_on_wifi.cc(1304)] OnDarkResume: Too many dark \ 1316 # resumes; disabling wake on WiFi temporarily 1317 dark_resume_log_regex_str = 'ERROR:wake_on_wifi\.cc.*OnDarkResume:.*' 1318 throttled_msg_substr = ('Too many dark resumes; disabling wake on ' 1319 'WiFi temporarily') 1320 1321 cmd = ('grep -E %s /var/log/net.log | tail -1' % 1322 dark_resume_log_regex_str) 1323 last_dark_resume_error_log = self.host.run(cmd).stdout 1324 if not last_dark_resume_error_log: 1325 raise error.TestFail('Could not find a dark resume log message.') 1326 1327 logging.debug('Last dark resume log message:\n%s', 1328 last_dark_resume_error_log) 1329 if not throttled_msg_substr in last_dark_resume_error_log: 1330 raise error.TestFail('Wake on WiFi was not throttled on the last ' 1331 'dark resume.') 1332 1333 logging.info('Wake on WiFi was throttled on the last dark resume.') 1334 1335 1336 def shill_debug_log(self, message): 1337 """Logs a message to the shill log (i.e. /var/log/net.log). This 1338 message will be logged at the DEBUG level. 1339 1340 @param message: the message to be logged. 1341 1342 """ 1343 logging.info(message) 1344 logger_command = ('/usr/bin/logger' 1345 ' --tag shill' 1346 ' --priority daemon.debug' 1347 ' "%s"' % utils.sh_escape(message)) 1348 self.host.run(logger_command) 1349 1350 1351 def is_wake_on_wifi_supported(self): 1352 """Returns true iff wake-on-WiFi is supported by the DUT.""" 1353 1354 if (self.shill.get_dbus_property_on_device( 1355 self.wifi_if, self.WAKE_ON_WIFI_FEATURES) == 1356 WAKE_ON_WIFI_NOT_SUPPORTED): 1357 return False 1358 return True 1359 1360 1361 def set_manager_property(self, prop_name, prop_value): 1362 """Sets the given manager property to the value provided. 1363 1364 @param prop_name: the property to be set 1365 @param prop_value: value to assign to the prop_name 1366 @return a context manager for the setting 1367 1368 """ 1369 return TemporaryManagerDBusProperty(self._shill_proxy, 1370 prop_name, 1371 prop_value) 1372 1373 1374class TemporaryDeviceDBusProperty: 1375 """Utility class to temporarily change a dbus property for the WiFi device. 1376 1377 Since dbus properties are global and persistent settings, we want 1378 to make sure that we change them back to what they were before the test 1379 started. 1380 1381 """ 1382 1383 def __init__(self, shill_proxy, iface, prop_name, value): 1384 """Construct a TemporaryDeviceDBusProperty context manager. 1385 1386 1387 @param shill_proxy: the shill proxy to use to communicate via dbus 1388 @param iface: device whose property to change (e.g. 'wlan0') 1389 @param prop_name: the name of the property we want to set 1390 @param value: the desired value of the property 1391 1392 """ 1393 self._shill = shill_proxy 1394 self._interface = iface 1395 self._prop_name = prop_name 1396 self._value = value 1397 self._saved_value = None 1398 1399 1400 def __enter__(self): 1401 logging.info('- Setting property %s on device %s', 1402 self._prop_name, 1403 self._interface) 1404 1405 self._saved_value = self._shill.get_dbus_property_on_device( 1406 self._interface, self._prop_name) 1407 if self._saved_value is None: 1408 raise error.TestFail('Device or property not found.') 1409 if not self._shill.set_dbus_property_on_device(self._interface, 1410 self._prop_name, 1411 self._value): 1412 raise error.TestFail('Could not set property') 1413 1414 logging.info('- Changed value from %s to %s', 1415 self._saved_value, 1416 self._value) 1417 1418 1419 def __exit__(self, exception, value, traceback): 1420 logging.info('- Resetting property %s', self._prop_name) 1421 1422 if not self._shill.set_dbus_property_on_device(self._interface, 1423 self._prop_name, 1424 self._saved_value): 1425 raise error.TestFail('Could not reset property') 1426 1427 1428class TemporaryManagerDBusProperty: 1429 """Utility class to temporarily change a Manager dbus property. 1430 1431 Since dbus properties are global and persistent settings, we want 1432 to make sure that we change them back to what they were before the test 1433 started. 1434 1435 """ 1436 1437 def __init__(self, shill_proxy, prop_name, value): 1438 """Construct a TemporaryManagerDBusProperty context manager. 1439 1440 @param shill_proxy: the shill proxy to use to communicate via dbus 1441 @param prop_name: the name of the property we want to set 1442 @param value: the desired value of the property 1443 1444 """ 1445 self._shill = shill_proxy 1446 self._prop_name = prop_name 1447 self._value = value 1448 self._saved_value = None 1449 1450 1451 def __enter__(self): 1452 logging.info('- Setting Manager property: %s', self._prop_name) 1453 1454 self._saved_value = self._shill.get_manager_property( 1455 self._prop_name) 1456 if self._saved_value is None: 1457 self._saved_value = "" 1458 if not self._shill.set_optional_manager_property(self._prop_name, 1459 self._value): 1460 raise error.TestFail('Could not set optional manager property.') 1461 else: 1462 setprop_result = self._shill.set_manager_property(self._prop_name, 1463 self._value) 1464 if not setprop_result: 1465 raise error.TestFail('Could not set manager property') 1466 1467 logging.info('- Changed value from [%s] to [%s]', 1468 self._saved_value, 1469 self._value) 1470 1471 1472 def __exit__(self, exception, value, traceback): 1473 logging.info('- Resetting property %s to [%s]', 1474 self._prop_name, 1475 self._saved_value) 1476 1477 if not self._shill.set_manager_property(self._prop_name, 1478 self._saved_value): 1479 raise error.TestFail('Could not reset manager property') 1480