1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import contextlib 6import logging 7import math 8import re 9import time 10 11from contextlib import contextmanager 12from collections import namedtuple 13 14from autotest_lib.client.common_lib import error 15from autotest_lib.client.common_lib import utils 16from autotest_lib.client.common_lib.cros.network import interface 17from autotest_lib.client.common_lib.cros.network import iw_runner 18from autotest_lib.client.common_lib.cros.network import ping_runner 19from autotest_lib.client.cros import constants 20from autotest_lib.server import autotest 21from autotest_lib.server import site_linux_system 22from autotest_lib.server.cros.network import wpa_cli_proxy 23from autotest_lib.server.hosts import cast_os_host 24 25# Wake-on-WiFi feature strings 26WAKE_ON_WIFI_NONE = 'none' 27WAKE_ON_WIFI_PACKET = 'packet' 28WAKE_ON_WIFI_DARKCONNECT = 'darkconnect' 29WAKE_ON_WIFI_PACKET_DARKCONNECT = 'packet_and_darkconnect' 30WAKE_ON_WIFI_NOT_SUPPORTED = 'not_supported' 31 32# Wake-on-WiFi test timing constants 33SUSPEND_WAIT_TIME_SECONDS = 10 34RECEIVE_PACKET_WAIT_TIME_SECONDS = 10 35DARK_RESUME_WAIT_TIME_SECONDS = 25 36WAKE_TO_SCAN_PERIOD_SECONDS = 30 37NET_DETECT_SCAN_WAIT_TIME_SECONDS = 15 38WAIT_UP_TIMEOUT_SECONDS = 10 39DISCONNECT_WAIT_TIME_SECONDS = 10 40INTERFACE_DOWN_WAIT_TIME_SECONDS = 10 41 42ConnectTime = namedtuple('ConnectTime', 'state, time') 43 44XMLRPC_BRINGUP_TIMEOUT_SECONDS = 60 45SHILL_XMLRPC_LOG_PATH = '/var/log/shill_xmlrpc_server.log' 46 47 48def _is_eureka_host(host): 49 return host.get_os_type() == cast_os_host.OS_TYPE_CAST_OS 50 51 52def get_xmlrpc_proxy(host): 53 """Get a shill XMLRPC proxy for |host|. 54 55 The returned object has no particular type. Instead, when you call 56 a method on the object, it marshalls the objects passed as arguments 57 and uses them to make RPCs on the remote server. Thus, you should 58 read shill_xmlrpc_server.py to find out what methods are supported. 59 60 @param host: host object representing a remote device. 61 @return proxy object for remote XMLRPC server. 62 63 """ 64 # Make sure the client library is on the device so that the proxy 65 # code is there when we try to call it. 66 if host.is_client_install_supported: 67 client_at = autotest.Autotest(host) 68 client_at.install() 69 # This is the default port for shill xmlrpc server. 70 server_port = constants.SHILL_XMLRPC_SERVER_PORT 71 xmlrpc_server_command = constants.SHILL_XMLRPC_SERVER_COMMAND 72 log_path = SHILL_XMLRPC_LOG_PATH 73 command_name = constants.SHILL_XMLRPC_SERVER_CLEANUP_PATTERN 74 rpc_server_host = host 75 76 # Start up the XMLRPC proxy on the client 77 proxy = rpc_server_host.rpc_server_tracker.xmlrpc_connect( 78 xmlrpc_server_command, 79 server_port, 80 command_name=command_name, 81 ready_test_name=constants.SHILL_XMLRPC_SERVER_READY_METHOD, 82 timeout_seconds=XMLRPC_BRINGUP_TIMEOUT_SECONDS, 83 logfile=log_path 84 ) 85 return proxy 86 87 88def _is_conductive(host): 89 """Determine if the host is conductive based on AFE labels. 90 91 @param host: A Host object. 92 """ 93 info = host.host_info_store.get() 94 conductive = info.get_label_value('conductive') 95 return conductive.lower() == 'true' 96 97 98class WiFiClient(site_linux_system.LinuxSystem): 99 """WiFiClient is a thin layer of logic over a remote DUT in wifitests.""" 100 101 DEFAULT_PING_COUNT = 10 102 COMMAND_PING = 'ping' 103 104 MAX_SERVICE_GONE_TIMEOUT_SECONDS = 60 105 106 # List of interface names we won't consider for use as "the" WiFi interface 107 # on Android or CastOS hosts. 108 WIFI_IF_BLACKLIST = ['p2p0', 'wfd0'] 109 110 UNKNOWN_BOARD_TYPE = 'unknown' 111 112 # DBus device properties. Wireless interfaces should support these. 113 ROAM_THRESHOLD = 'RoamThreshold' 114 WAKE_ON_WIFI_FEATURES = 'WakeOnWiFiFeaturesEnabled' 115 NET_DETECT_SCAN_PERIOD = 'NetDetectScanPeriodSeconds' 116 WAKE_TO_SCAN_PERIOD = 'WakeToScanPeriodSeconds' 117 FORCE_WAKE_TO_SCAN_TIMER = 'ForceWakeToScanTimer' 118 MAC_ADDRESS_RANDOMIZATION_SUPPORTED = 'MACAddressRandomizationSupported' 119 MAC_ADDRESS_RANDOMIZATION_ENABLED = 'MACAddressRandomizationEnabled' 120 121 CONNECTED_STATES = ['ready', 'portal', 'online'] 122 123 124 @property 125 def machine_id(self): 126 """@return string unique to a particular board/cpu configuration.""" 127 if self._machine_id: 128 return self._machine_id 129 130 uname_result = self.host.run('uname -m', ignore_status=True) 131 kernel_arch = '' 132 if not uname_result.exit_status and uname_result.stdout.find(' ') < 0: 133 kernel_arch = uname_result.stdout.strip() 134 cpu_info = self.host.run('cat /proc/cpuinfo').stdout.splitlines() 135 cpu_count = len(filter(lambda x: x.lower().startswith('bogomips'), 136 cpu_info)) 137 cpu_count_str = '' 138 if cpu_count: 139 cpu_count_str = 'x%d' % cpu_count 140 ghz_value = '' 141 ghz_pattern = re.compile('([0-9.]+GHz)') 142 for line in cpu_info: 143 match = ghz_pattern.search(line) 144 if match is not None: 145 ghz_value = '_' + match.group(1) 146 break 147 148 return '%s_%s%s%s' % (self.board, kernel_arch, ghz_value, cpu_count_str) 149 150 151 @property 152 def powersave_on(self): 153 """@return bool True iff WiFi powersave mode is enabled.""" 154 result = self.host.run("iw dev %s get power_save" % self.wifi_if) 155 output = result.stdout.rstrip() # NB: chop \n 156 # Output should be either "Power save: on" or "Power save: off". 157 find_re = re.compile('([^:]+):\s+(\w+)') 158 find_results = find_re.match(output) 159 if not find_results: 160 raise error.TestFail('Failed to find power_save parameter ' 161 'in iw results.') 162 163 return find_results.group(2) == 'on' 164 165 166 @property 167 def shill(self): 168 """@return shill RPCProxy object.""" 169 return self._shill_proxy 170 171 172 @property 173 def client(self): 174 """Deprecated accessor for the client host. 175 176 The term client is used very loosely in old autotests and this 177 accessor should not be used in new code. Use host() instead. 178 179 @return host object representing a remote DUT. 180 181 """ 182 return self.host 183 184 185 @property 186 def command_ip(self): 187 """@return string path to ip command.""" 188 return self._command_ip 189 190 191 @property 192 def command_iptables(self): 193 """@return string path to iptables command.""" 194 return self._command_iptables 195 196 197 @property 198 def command_ping6(self): 199 """@return string path to ping6 command.""" 200 return self._command_ping6 201 202 203 @property 204 def command_wpa_cli(self): 205 """@return string path to wpa_cli command.""" 206 return self._command_wpa_cli 207 208 209 @property 210 def conductive(self): 211 """@return True if the rig is conductive; False otherwise.""" 212 if self._conductive is None: 213 self._conductive = _is_conductive(self.host) 214 return self._conductive 215 216 217 @conductive.setter 218 def conductive(self, value): 219 """Set the conductive member to True or False. 220 221 @param value: boolean value to set the conductive member to. 222 """ 223 self._conductive = value 224 225 226 @property 227 def module_name(self): 228 """@return Name of kernel module in use by this interface.""" 229 return self._interface.module_name 230 231 @property 232 def parent_device_name(self): 233 """ 234 @return Path of the parent device for the net device""" 235 return self._interface.parent_device_name 236 237 @property 238 def wifi_if(self): 239 """@return string wifi device on machine (e.g. mlan0).""" 240 return self._wifi_if 241 242 243 @property 244 def wifi_mac(self): 245 """@return string MAC address of self.wifi_if.""" 246 return self._interface.mac_address 247 248 249 @property 250 def wifi_ip(self): 251 """@return string IPv4 address of self.wifi_if.""" 252 return self._interface.ipv4_address 253 254 255 @property 256 def wifi_ip_subnet(self): 257 """@return string IPv4 subnet prefix of self.wifi_if.""" 258 return self._interface.ipv4_subnet 259 260 261 @property 262 def wifi_phy_name(self): 263 """@return wiphy name (e.g., 'phy0') or None""" 264 return self._interface.wiphy_name 265 266 @property 267 def wifi_signal_level(self): 268 """Returns the signal level of this DUT's WiFi interface. 269 270 @return int signal level of connected WiFi interface or None (e.g. -67). 271 272 """ 273 return self._interface.signal_level 274 275 276 @staticmethod 277 def assert_bsses_include_ssids(found_bsses, expected_ssids): 278 """Verifies that |found_bsses| includes |expected_ssids|. 279 280 @param found_bsses list of IwBss objects. 281 @param expected_ssids list of string SSIDs. 282 @raise error.TestFail if any element of |expected_ssids| is not found. 283 284 """ 285 for ssid in expected_ssids: 286 if not ssid: 287 continue 288 289 for bss in found_bsses: 290 if bss.ssid == ssid: 291 break 292 else: 293 raise error.TestFail('SSID %s is not in scan results: %r' % 294 (ssid, found_bsses)) 295 296 297 def wifi_noise_level(self, frequency_mhz): 298 """Returns the noise level of this DUT's WiFi interface. 299 300 @param frequency_mhz: frequency at which the noise level should be 301 measured and reported. 302 @return int signal level of connected WiFi interface in dBm (e.g. -67) 303 or None if the value is unavailable. 304 305 """ 306 return self._interface.noise_level(frequency_mhz) 307 308 309 def __init__(self, client_host, result_dir, use_wpa_cli): 310 """ 311 Construct a WiFiClient. 312 313 @param client_host host object representing a remote host. 314 @param result_dir string directory to store test logs/packet caps. 315 @param use_wpa_cli bool True if we want to use |wpa_cli| commands for 316 Android testing. 317 318 """ 319 super(WiFiClient, self).__init__(client_host, 'client', 320 inherit_interfaces=True) 321 self._command_ip = 'ip' 322 self._command_iptables = 'iptables' 323 self._command_ping6 = 'ping6' 324 self._command_wpa_cli = 'wpa_cli' 325 self._machine_id = None 326 self._result_dir = result_dir 327 self._conductive = None 328 329 if _is_eureka_host(self.host) and use_wpa_cli: 330 # Look up the WiFi device (and its MAC) on the client. 331 devs = self.iw_runner.list_interfaces(desired_if_type='managed') 332 devs = [dev for dev in devs 333 if dev.if_name not in self.WIFI_IF_BLACKLIST] 334 if not devs: 335 raise error.TestFail('No wlan devices found on %s.' % 336 self.host.hostname) 337 338 if len(devs) > 1: 339 logging.warning('Warning, found multiple WiFi devices on ' 340 '%s: %r', self.host.hostname, devs) 341 self._wifi_if = devs[0].if_name 342 self._shill_proxy = wpa_cli_proxy.WpaCliProxy( 343 self.host, self._wifi_if) 344 self._wpa_cli_proxy = self._shill_proxy 345 else: 346 self._shill_proxy = get_xmlrpc_proxy(self.host) 347 interfaces = self._shill_proxy.list_controlled_wifi_interfaces() 348 if not interfaces: 349 logging.debug('No interfaces managed by shill. Rebooting host') 350 self.host.reboot() 351 raise error.TestError('No interfaces managed by shill on %s' % 352 self.host.hostname) 353 self._wifi_if = interfaces[0] 354 self._wpa_cli_proxy = wpa_cli_proxy.WpaCliProxy( 355 self.host, self._wifi_if) 356 self._raise_logging_level() 357 self._interface = interface.Interface(self._wifi_if, host=self.host) 358 logging.debug('WiFi interface is: %r', 359 self._interface.device_description) 360 self._firewall_rules = [] 361 # All tests that use this object assume the interface starts enabled. 362 self.set_device_enabled(self._wifi_if, True) 363 # Turn off powersave mode by default. 364 self.powersave_switch(False) 365 # Invoke the |capabilities| property defined in the parent |Linuxsystem| 366 # to workaround the lazy loading of the capabilities cache and supported 367 # frequency list. This is needed for tests that may need access to these 368 # when the DUT is unreachable (for ex: suspended). 369 #pylint: disable=pointless-statement 370 self.capabilities 371 372 373 def _assert_method_supported(self, method_name): 374 """Raise a TestNAError if the XMLRPC proxy has no method |method_name|. 375 376 @param method_name: string name of method that should exist on the 377 XMLRPC proxy. 378 379 """ 380 if not self._supports_method(method_name): 381 raise error.TestNAError('%s() is not supported' % method_name) 382 383 384 def _raise_logging_level(self): 385 """Raises logging levels for WiFi on DUT.""" 386 self.host.run('wpa_debug excessive', ignore_status=True) 387 self.host.run('ff_debug --level -5', ignore_status=True) 388 self.host.run('ff_debug +wifi', ignore_status=True) 389 390 391 def is_vht_supported(self): 392 """Returns True if VHT supported; False otherwise""" 393 return self.CAPABILITY_VHT in self.capabilities 394 395 396 def is_5ghz_supported(self): 397 """Returns True if 5Ghz bands are supported; False otherwise.""" 398 return self.CAPABILITY_5GHZ in self.capabilities 399 400 401 def is_ibss_supported(self): 402 """Returns True if IBSS mode is supported; False otherwise.""" 403 return self.CAPABILITY_IBSS in self.capabilities 404 405 406 def is_frequency_supported(self, frequency): 407 """Returns True if the given frequency is supported; False otherwise. 408 409 @param frequency: int Wifi frequency to check if it is supported by 410 DUT. 411 """ 412 return frequency in self.phys_for_frequency 413 414 415 def _supports_method(self, method_name): 416 """Checks if |method_name| is supported on the remote XMLRPC proxy. 417 418 autotest will, for their own reasons, install python files in the 419 autotest client package that correspond the version of the build 420 rather than the version running on the autotest drone. This 421 creates situations where we call methods on the client XMLRPC proxy 422 that don't exist in that version of the code. This detects those 423 situations so that we can degrade more or less gracefully. 424 425 @param method_name: string name of method that should exist on the 426 XMLRPC proxy. 427 @return True if method is available, False otherwise. 428 429 """ 430 supported = (_is_eureka_host(self.host) 431 or method_name in self._shill_proxy.system.listMethods()) 432 if not supported: 433 logging.warning('%s() is not supported on older images', 434 method_name) 435 return supported 436 437 438 def close(self): 439 """Tear down state associated with the client.""" 440 self.stop_capture() 441 self.powersave_switch(False) 442 self.shill.clean_profiles() 443 super(WiFiClient, self).close() 444 445 446 def firewall_open(self, proto, src): 447 """Opens up firewall to run netperf tests. 448 449 By default, we have a firewall rule for NFQUEUE (see crbug.com/220736). 450 In order to run netperf test, we need to add a new firewall rule BEFORE 451 this NFQUEUE rule in the INPUT chain. 452 453 @param proto a string, test traffic protocol, e.g. udp, tcp. 454 @param src a string, subnet/mask. 455 456 @return a string firewall rule added. 457 458 """ 459 rule = 'INPUT -s %s/32 -p %s -m %s -j ACCEPT' % (src, proto, proto) 460 self.host.run('%s -I %s' % (self._command_iptables, rule)) 461 self._firewall_rules.append(rule) 462 return rule 463 464 465 def firewall_cleanup(self): 466 """Cleans up firewall rules.""" 467 for rule in self._firewall_rules: 468 self.host.run('%s -D %s' % (self._command_iptables, rule)) 469 self._firewall_rules = [] 470 471 472 def sync_host_times(self): 473 """Set time on our DUT to match local time.""" 474 epoch_seconds = time.time() 475 self.shill.sync_time_to(epoch_seconds) 476 477 478 def collect_debug_info(self, local_save_dir_prefix): 479 """Collect any debug information needed from the DUT 480 481 This invokes the |collect_debug_info| RPC method to trigger 482 bugreport/logcat collection and then transfers the logs to the 483 server. 484 485 @param local_save_dir_prefix Used as a prefix for local save directory. 486 """ 487 pass 488 489 490 def check_iw_link_value(self, iw_link_key, desired_value): 491 """Assert that the current wireless link property is |desired_value|. 492 493 @param iw_link_key string one of IW_LINK_KEY_* defined in iw_runner. 494 @param desired_value string desired value of iw link property. 495 496 """ 497 actual_value = self.get_iw_link_value(iw_link_key) 498 desired_value = str(desired_value) 499 if actual_value != desired_value: 500 raise error.TestFail('Wanted iw link property %s value %s, but ' 501 'got %s instead.' % (iw_link_key, 502 desired_value, 503 actual_value)) 504 505 506 def get_iw_link_value(self, iw_link_key): 507 """Get the current value of a link property for this WiFi interface. 508 509 @param iw_link_key string one of IW_LINK_KEY_* defined in iw_runner. 510 511 """ 512 return self.iw_runner.get_link_value(self.wifi_if, iw_link_key) 513 514 515 def powersave_switch(self, turn_on): 516 """Toggle powersave mode for the DUT. 517 518 @param turn_on bool True iff powersave mode should be turned on. 519 520 """ 521 mode = 'off' 522 if turn_on: 523 mode = 'on' 524 # Turn ON interface and set power_save option. 525 self.host.run('ifconfig %s up' % self.wifi_if) 526 self.host.run('iw dev %s set power_save %s' % (self.wifi_if, mode)) 527 528 529 def timed_scan(self, frequencies, ssids, scan_timeout_seconds=10, 530 retry_timeout_seconds=10): 531 """Request timed scan to discover given SSIDs. 532 533 This method will retry for a default of |retry_timeout_seconds| until it 534 is able to successfully kick off a scan. Sometimes, the device on the 535 DUT claims to be busy and rejects our requests. It will raise error 536 if the scan did not complete within |scan_timeout_seconds| or it was 537 not able to discover the given SSIDs. 538 539 @param frequencies list of int WiFi frequencies to scan for. 540 @param ssids list of string ssids to probe request for. 541 @param scan_timeout_seconds: float number of seconds the scan 542 operation not to exceed. 543 @param retry_timeout_seconds: float number of seconds to retry scanning 544 if the interface is busy. This does not retry if certain 545 SSIDs are missing from the results. 546 @return time in seconds took to complete scan request. 547 548 """ 549 # the poll method returns the result of the func 550 scan_result = utils.poll_for_condition( 551 condition=lambda: self.iw_runner.timed_scan( 552 self.wifi_if, 553 frequencies=frequencies, 554 ssids=ssids), 555 exception=error.TestFail('Unable to trigger scan on client'), 556 timeout=retry_timeout_seconds, 557 sleep_interval=0.5) 558 # Verify scan operation completed within given timeout 559 if scan_result.time > scan_timeout_seconds: 560 raise error.TestFail('Scan time %.2fs exceeds the scan timeout' % 561 (scan_result.time)) 562 563 # Verify all ssids are discovered 564 self.assert_bsses_include_ssids(scan_result.bss_list, ssids) 565 566 logging.info('Wifi scan completed in %.2f seconds', scan_result.time) 567 return scan_result.time 568 569 570 def scan(self, frequencies, ssids, timeout_seconds=10, require_match=True): 571 """Request a scan and (optionally) check that requested SSIDs appear in 572 the results. 573 574 This method will retry for a default of |timeout_seconds| until it is 575 able to successfully kick off a scan. Sometimes, the device on the DUT 576 claims to be busy and rejects our requests. 577 578 If |ssids| is non-empty, we will speficially probe for those SSIDs. 579 580 If |require_match| is True, we will verify that every element 581 of |ssids| was found in scan results. 582 583 @param frequencies list of int WiFi frequencies to scan for. 584 @param ssids list of string ssids to probe request for. 585 @param timeout_seconds: float number of seconds to retry scanning 586 if the interface is busy. This does not retry if certain 587 SSIDs are missing from the results. 588 @param require_match: bool True if we must find |ssids|. 589 590 """ 591 bss_list = utils.poll_for_condition( 592 condition=lambda: self.iw_runner.scan( 593 self.wifi_if, 594 frequencies=frequencies, 595 ssids=ssids), 596 exception=error.TestFail('Unable to trigger scan on client'), 597 timeout=timeout_seconds, 598 sleep_interval=0.5) 599 600 if require_match: 601 self.assert_bsses_include_ssids(bss_list, ssids) 602 603 604 def wait_for_bsses(self, ssid, num_bss_expected, timeout_seconds=15): 605 """Wait for all BSSes associated with given SSID to be discovered in the 606 scan. 607 608 @param ssid string name of network being queried 609 @param num_bss_expected int number of BSSes expected 610 @param timeout_seconds int seconds to wait for BSSes to be discovered 611 612 """ 613 # If the scan returns None, return 0, else return the matching count 614 num_bss_actual = 0 615 def are_all_bsses_discovered(): 616 """Determine if all BSSes associated with the SSID from parent 617 function are discovered in the scan 618 619 @return boolean representing whether the expected bss count matches 620 how many in the scan match the given ssid 621 """ 622 self.claim_wifi_if() # Stop shill/supplicant scans 623 try: 624 scan_results = self.iw_runner.scan( 625 self.wifi_if, 626 frequencies=[], 627 ssids=[ssid]) 628 if scan_results is None: 629 return False 630 num_bss_actual = sum(ssid == bss.ssid for bss in scan_results) 631 return num_bss_expected == num_bss_actual 632 finally: 633 self.release_wifi_if() 634 635 utils.poll_for_condition( 636 condition=are_all_bsses_discovered, 637 exception=error.TestFail('Failed to discover all BSSes. Found %d,' 638 ' wanted %d with SSID %s' % 639 (num_bss_actual, num_bss_expected, ssid)), 640 timeout=timeout_seconds, 641 sleep_interval=0.5) 642 643 def wait_for_service_states(self, ssid, states, timeout_seconds): 644 """Waits for a WiFi service to achieve one of |states|. 645 646 @param ssid string name of network being queried 647 @param states tuple list of states for which the caller is waiting 648 @param timeout_seconds int seconds to wait for a state in |states| 649 650 """ 651 logging.info('Waiting for %s to reach one of %r...', ssid, states) 652 success, state, duration = self._shill_proxy.wait_for_service_states( 653 ssid, states, timeout_seconds) 654 logging.info('...ended up in state \'%s\' (%s) after %f seconds.', 655 state, 'success' if success else 'failure', duration) 656 return success, state, duration 657 658 659 def do_suspend(self, seconds): 660 """Puts the DUT in suspend power state for |seconds| seconds. 661 662 @param seconds: The number of seconds to suspend the device. 663 664 """ 665 logging.info('Suspending DUT for %d seconds...', seconds) 666 self._shill_proxy.do_suspend(seconds) 667 logging.info('...done suspending') 668 669 670 def do_suspend_bg(self, seconds): 671 """Suspend DUT using the power manager - non-blocking. 672 673 @param seconds: The number of seconds to suspend the device. 674 675 """ 676 logging.info('Suspending DUT (in background) for %d seconds...', 677 seconds) 678 self._shill_proxy.do_suspend_bg(seconds) 679 680 681 def clear_supplicant_blacklist(self): 682 """Clear's the AP blacklist on the DUT. 683 684 @return stdout and stderror returns passed from wpa_cli command. 685 686 """ 687 result = self._wpa_cli_proxy.run_wpa_cli_cmd('blacklist clear', 688 check_result=False); 689 logging.info('wpa_cli blacklist clear: out:%r err:%r', result.stdout, 690 result.stderr) 691 return result.stdout, result.stderr 692 693 694 def get_active_wifi_SSIDs(self): 695 """Get a list of visible SSID's around the DUT 696 697 @return list of string SSIDs 698 699 """ 700 self._assert_method_supported('get_active_wifi_SSIDs') 701 return self._shill_proxy.get_active_wifi_SSIDs() 702 703 704 def roam_threshold(self, value): 705 """Get a context manager to temporarily change wpa_supplicant's 706 roaming threshold for the specified interface. 707 708 The correct way to use this method is: 709 710 with client.roam_threshold(40): 711 ... 712 713 @param value: the desired roam threshold for the test. 714 715 @return a context manager for the threshold. 716 717 """ 718 return TemporaryDeviceDBusProperty(self._shill_proxy, 719 self.wifi_if, 720 self.ROAM_THRESHOLD, 721 value) 722 723 724 def set_device_enabled(self, wifi_interface, value, 725 fail_on_unsupported=False): 726 """Enable or disable the WiFi device. 727 728 @param wifi_interface: string name of interface being modified. 729 @param enabled: boolean; true if this device should be enabled, 730 false if this device should be disabled. 731 @return True if it worked; False, otherwise. 732 733 """ 734 if fail_on_unsupported: 735 self._assert_method_supported('set_device_enabled') 736 elif not self._supports_method('set_device_enabled'): 737 return False 738 return self._shill_proxy.set_device_enabled(wifi_interface, value) 739 740 741 def add_arp_entry(self, ip_address, mac_address): 742 """Add an ARP entry to the table associated with the WiFi interface. 743 744 @param ip_address: string IP address associated with the new ARP entry. 745 @param mac_address: string MAC address associated with the new ARP 746 entry. 747 748 """ 749 self.host.run('ip neigh add %s lladdr %s dev %s nud perm' % 750 (ip_address, mac_address, self.wifi_if)) 751 752 753 def discover_tdls_link(self, mac_address): 754 """Send a TDLS Discover to |peer_mac_address|. 755 756 @param mac_address: string MAC address associated with the TDLS peer. 757 758 @return bool True if operation initiated successfully, False otherwise. 759 760 """ 761 logging.info('TDLS discovery with peer %s', mac_address) 762 return self._shill_proxy.discover_tdls_link(self.wifi_if, mac_address) 763 764 765 def establish_tdls_link(self, mac_address): 766 """Establish a TDLS link with |mac_address|. 767 768 @param mac_address: string MAC address associated with the TDLS peer. 769 770 @return bool True if operation initiated successfully, False otherwise. 771 772 """ 773 logging.info('Establishing TDLS link with peer %s', mac_address) 774 return self._shill_proxy.establish_tdls_link(self.wifi_if, mac_address) 775 776 777 def query_tdls_link(self, mac_address): 778 """Query a TDLS link with |mac_address|. 779 780 @param mac_address: string MAC address associated with the TDLS peer. 781 782 @return string indicating current TDLS connectivity. 783 784 """ 785 logging.info('Querying TDLS link with peer %s', mac_address) 786 return self._shill_proxy.query_tdls_link(self.wifi_if, mac_address) 787 788 789 def add_wake_packet_source(self, source_ip): 790 """Add |source_ip| as a source that can wake us up with packets. 791 792 @param source_ip: IP address from which to wake upon receipt of packets 793 794 @return True if successful, False otherwise. 795 796 """ 797 return self._shill_proxy.add_wake_packet_source( 798 self.wifi_if, source_ip) 799 800 801 def remove_wake_packet_source(self, source_ip): 802 """Remove |source_ip| as a source that can wake us up with packets. 803 804 @param source_ip: IP address to stop waking on packets from 805 806 @return True if successful, False otherwise. 807 808 """ 809 return self._shill_proxy.remove_wake_packet_source( 810 self.wifi_if, source_ip) 811 812 813 def remove_all_wake_packet_sources(self): 814 """Remove all IPs as sources that can wake us up with packets. 815 816 @return True if successful, False otherwise. 817 818 """ 819 return self._shill_proxy.remove_all_wake_packet_sources(self.wifi_if) 820 821 822 def wake_on_wifi_features(self, features): 823 """Shill supports programming the NIC to wake on special kinds of 824 incoming packets, or on changes to the available APs (disconnect, 825 coming in range of a known SSID). This method allows you to configure 826 what wake-on-WiFi mechanisms are active. It returns a context manager, 827 because this is a system-wide setting and we don't want it to persist 828 across different tests. 829 830 If you enable wake-on-packet, then the IPs registered by 831 add_wake_packet_source will be able to wake the system from suspend. 832 833 The correct way to use this method is: 834 835 with client.wake_on_wifi_features(WAKE_ON_WIFI_DARKCONNECT): 836 ... 837 838 @param features: string from the WAKE_ON_WIFI constants above. 839 840 @return a context manager for the features. 841 842 """ 843 return TemporaryDeviceDBusProperty(self._shill_proxy, 844 self.wifi_if, 845 self.WAKE_ON_WIFI_FEATURES, 846 features) 847 848 849 def net_detect_scan_period_seconds(self, period): 850 """Sets the period between net detect scans performed by the NIC to look 851 for whitelisted SSIDs to |period|. This setting only takes effect if the 852 NIC is programmed to wake on SSID. Use as with roam_threshold. 853 854 @param period: integer number of seconds between NIC net detect scans 855 856 @return a context manager for the net detect scan period 857 858 """ 859 return TemporaryDeviceDBusProperty(self._shill_proxy, 860 self.wifi_if, 861 self.NET_DETECT_SCAN_PERIOD, 862 period) 863 864 865 def wake_to_scan_period_seconds(self, period): 866 """Sets the period between RTC timer wakeups where the system is woken 867 from suspend to perform scans. This setting only takes effect if the 868 NIC is programmed to wake on SSID. Use as with roam_threshold. 869 870 @param period: integer number of seconds between wake to scan RTC timer 871 wakes. 872 873 @return a context manager for the net detect scan period 874 875 """ 876 return TemporaryDeviceDBusProperty(self._shill_proxy, 877 self.wifi_if, 878 self.WAKE_TO_SCAN_PERIOD, 879 period) 880 881 882 def force_wake_to_scan_timer(self, is_forced): 883 """Sets the boolean value determining whether or not to force the use of 884 the wake to scan RTC timer, which wakes the system from suspend 885 periodically to scan for networks. 886 887 @param is_forced: boolean whether or not to force the use of the wake to 888 scan timer 889 890 @return a context manager for the net detect scan period 891 892 """ 893 return TemporaryDeviceDBusProperty(self._shill_proxy, 894 self.wifi_if, 895 self.FORCE_WAKE_TO_SCAN_TIMER, 896 is_forced) 897 898 899 def mac_address_randomization(self, enabled): 900 """Sets the boolean value determining whether or not to enable MAC 901 address randomization. This instructs the NIC to randomize the last 902 three octets of the MAC address used in probe requests while 903 disconnected to make the DUT harder to track. 904 905 If MAC address randomization is not supported on this DUT and the 906 caller tries to turn it on, this raises a TestNAError. 907 908 @param enabled: boolean whether or not to enable MAC address 909 randomization 910 911 @return a context manager for the MAC address randomization property 912 913 """ 914 if not self._shill_proxy.get_dbus_property_on_device( 915 self.wifi_if, self.MAC_ADDRESS_RANDOMIZATION_SUPPORTED): 916 if enabled: 917 raise error.TestNAError( 918 'MAC address randomization not supported') 919 else: 920 # Return a no-op context manager. 921 return contextlib.nested() 922 923 return TemporaryDeviceDBusProperty( 924 self._shill_proxy, 925 self.wifi_if, 926 self.MAC_ADDRESS_RANDOMIZATION_ENABLED, 927 enabled) 928 929 930 def request_roam(self, bssid): 931 """Request that we roam to the specified BSSID. 932 933 Note that this operation assumes that: 934 935 1) We're connected to an SSID for which |bssid| is a member. 936 2) There is a BSS with an appropriate ID in our scan results. 937 938 This method does not check for success of either the command or 939 the roaming operation. 940 941 @param bssid: string MAC address of bss to roam to. 942 943 """ 944 self._wpa_cli_proxy.run_wpa_cli_cmd('roam %s' % bssid, 945 check_result=False); 946 return True 947 948 949 def request_roam_dbus(self, bssid, iface): 950 """Request that we roam to the specified BSSID through dbus. 951 952 Note that this operation assumes that: 953 954 1) We're connected to an SSID for which |bssid| is a member. 955 2) There is a BSS with an appropriate ID in our scan results. 956 957 @param bssid: string MAC address of bss to roam to. 958 @param iface: interface to use 959 960 """ 961 self._assert_method_supported('request_roam_dbus') 962 self._shill_proxy.request_roam_dbus(bssid, iface) 963 964 965 def wait_for_roam(self, bssid, timeout_seconds=10.0): 966 """Wait for a roam to the given |bssid|. 967 968 @param bssid: string bssid to expect a roam to 969 (e.g. '00:11:22:33:44:55'). 970 @param timeout_seconds: float number of seconds to wait for a roam. 971 @return True if we detect an association to the given |bssid| within 972 |timeout_seconds|. 973 974 """ 975 def attempt_roam(): 976 """Perform a roam to the given |bssid| 977 978 @return True if there is an assocation between the given |bssid| 979 and that association is detected within the timeout frame 980 """ 981 current_bssid = self.iw_runner.get_current_bssid(self.wifi_if) 982 logging.debug('Current BSSID is %s.', current_bssid) 983 return current_bssid == bssid 984 985 start_time = time.time() 986 duration = 0.0 987 try: 988 success = utils.poll_for_condition( 989 condition=attempt_roam, 990 timeout=timeout_seconds, 991 sleep_interval=0.5, 992 desc='Wait for a roam to the given bssid') 993 # wait_for_roam should return False on timeout 994 except utils.TimeoutError: 995 success = False 996 997 duration = time.time() - start_time 998 logging.debug('%s to %s in %f seconds.', 999 'Roamed ' if success else 'Failed to roam ', 1000 bssid, 1001 duration) 1002 return success 1003 1004 1005 def wait_for_ssid_vanish(self, ssid): 1006 """Wait for shill to notice that there are no BSS's for an SSID present. 1007 1008 Raise a test failing exception if this does not come to pass. 1009 1010 @param ssid: string SSID of the network to require be missing. 1011 1012 """ 1013 def is_missing_yet(): 1014 """Determine if the ssid is removed from the service list yet 1015 1016 @return True if the ssid is not found in the service list 1017 """ 1018 visible_ssids = self.get_active_wifi_SSIDs() 1019 logging.info('Got service list: %r', visible_ssids) 1020 if ssid not in visible_ssids: 1021 return True 1022 self.scan(frequencies=[], ssids=[], timeout_seconds=30) 1023 1024 utils.poll_for_condition( 1025 condition=is_missing_yet, # func 1026 exception=error.TestFail('shill should mark BSS not present'), 1027 timeout=self.MAX_SERVICE_GONE_TIMEOUT_SECONDS, 1028 sleep_interval=0) 1029 1030 def reassociate(self, timeout_seconds=10): 1031 """Reassociate to the connected network. 1032 1033 @param timeout_seconds: float number of seconds to wait for operation 1034 to complete. 1035 1036 """ 1037 logging.info('Attempt to reassociate') 1038 with self.iw_runner.get_event_logger() as logger: 1039 logger.start() 1040 # Issue reattach command to wpa_supplicant 1041 self._wpa_cli_proxy.run_wpa_cli_cmd('reattach'); 1042 1043 # Wait for the timeout seconds for association to complete 1044 time.sleep(timeout_seconds) 1045 1046 # Stop iw event logger 1047 logger.stop() 1048 1049 # Get association time based on the iw event log 1050 reassociate_time = logger.get_reassociation_time() 1051 if reassociate_time is None or reassociate_time > timeout_seconds: 1052 raise error.TestFail( 1053 'Failed to reassociate within given timeout') 1054 logging.info('Reassociate time: %.2f seconds', reassociate_time) 1055 1056 1057 def wait_for_connection(self, ssid, timeout_seconds=30, freq=None, 1058 ping_ip=None, desired_subnet=None): 1059 """Verifies a connection to network ssid, optionally verifying 1060 frequency, ping connectivity and subnet. 1061 1062 @param ssid string ssid of the network to check. 1063 @param timeout_seconds int number of seconds to wait for 1064 connection on the given frequency. 1065 @param freq int frequency of network to check. 1066 @param ping_ip string ip address to ping for verification. 1067 @param desired_subnet string expected subnet in which client 1068 ip address should reside. 1069 1070 @returns a named tuple of (state, time) 1071 """ 1072 start_time = time.time() 1073 duration = lambda: time.time() - start_time 1074 state = [None] # need mutability for the nested method to save state 1075 1076 def verify_connection(): 1077 """Verify the connection and perform optional operations 1078 as defined in the parent function 1079 1080 @return False if there is a failure in the connection or 1081 the prescribed verification steps 1082 The named tuple ConnectTime otherwise, with the state 1083 and connection time from waiting for service states 1084 """ 1085 success, state[0], conn_time = self.wait_for_service_states( 1086 ssid, 1087 self.CONNECTED_STATES, 1088 int(math.ceil(timeout_seconds - duration()))) 1089 if not success: 1090 return False 1091 1092 if freq: 1093 actual_freq = self.get_iw_link_value( 1094 iw_runner.IW_LINK_KEY_FREQUENCY) 1095 if str(freq) != actual_freq: 1096 logging.debug( 1097 'Waiting for desired frequency %s (got %s).', 1098 freq, 1099 actual_freq) 1100 return False 1101 1102 if desired_subnet: 1103 actual_subnet = self.wifi_ip_subnet 1104 if actual_subnet != desired_subnet: 1105 logging.debug( 1106 'Waiting for desired subnet %s (got %s).', 1107 desired_subnet, 1108 actual_subnet) 1109 return False 1110 1111 if ping_ip: 1112 ping_config = ping_runner.PingConfig(ping_ip) 1113 self.ping(ping_config) 1114 1115 return ConnectTime(state[0], conn_time) 1116 1117 freq_error_str = (' on frequency %d Mhz' % freq) if freq else '' 1118 1119 return utils.poll_for_condition( 1120 condition=verify_connection, 1121 exception=error.TestFail( 1122 'Failed to connect to "%s"%s in %f seconds (state=%s)' % 1123 (ssid, 1124 freq_error_str, 1125 duration(), 1126 state[0])), 1127 timeout=timeout_seconds, 1128 sleep_interval=0) 1129 1130 @contextmanager 1131 def assert_disconnect_count(self, count): 1132 """Context asserting |count| disconnects for the context lifetime. 1133 1134 Creates an iw logger during the lifetime of the context and asserts 1135 that the client disconnects exactly |count| times. 1136 1137 @param count int the expected number of disconnections. 1138 1139 """ 1140 with self.iw_runner.get_event_logger() as logger: 1141 logger.start() 1142 yield 1143 logger.stop() 1144 if logger.get_disconnect_count() != count: 1145 raise error.TestFail( 1146 'Client disconnected %d times; expected %d' % 1147 (logger.get_disconnect_count(), count)) 1148 1149 1150 def assert_no_disconnects(self): 1151 """Context asserting no disconnects for the context lifetime.""" 1152 return self.assert_disconnect_count(0) 1153 1154 1155 @contextmanager 1156 def assert_disconnect_event(self): 1157 """Context asserting at least one disconnect for the context lifetime. 1158 1159 Creates an iw logger during the lifetime of the context and asserts 1160 that the client disconnects at least one time. 1161 1162 """ 1163 with self.iw_runner.get_event_logger() as logger: 1164 logger.start() 1165 yield 1166 logger.stop() 1167 if logger.get_disconnect_count() == 0: 1168 raise error.TestFail('Client did not disconnect') 1169 1170 1171 def get_num_card_resets(self): 1172 """Get card reset count.""" 1173 reset_msg = '[m]wifiex_sdio_card_reset' 1174 result = self.host.run('grep -c %s /var/log/messages' % reset_msg, 1175 ignore_status=True) 1176 if result.exit_status == 1: 1177 return 0 1178 count = int(result.stdout.strip()) 1179 return count 1180 1181 1182 def get_disconnect_reasons(self): 1183 """Get disconnect reason codes.""" 1184 disconnect_reason_msg = "updated DisconnectReason " 1185 disconnect_reason_cleared = "clearing DisconnectReason for " 1186 result = self.host.run('grep -E "(%s|%s)" /var/log/net.log' % 1187 (disconnect_reason_msg, 1188 disconnect_reason_cleared), 1189 ignore_status=True) 1190 if result.exit_status == 1: 1191 return None 1192 1193 lines = result.stdout.strip().split('\n') 1194 disconnect_reasons = [] 1195 disconnect_reason_regex = re.compile(' to (\D?\d+)') 1196 1197 found = False 1198 for line in reversed(lines): 1199 match = disconnect_reason_regex.search(line) 1200 if match is not None: 1201 disconnect_reasons.append(match.group(1)) 1202 found = True 1203 else: 1204 if (found): 1205 break 1206 return list(reversed(disconnect_reasons)) 1207 1208 1209 def release_wifi_if(self): 1210 """Release the control over the wifi interface back to normal operation. 1211 1212 This will give the ownership of the wifi interface back to shill and 1213 wpa_supplicant. 1214 1215 """ 1216 self.set_device_enabled(self._wifi_if, True) 1217 1218 1219 def claim_wifi_if(self): 1220 """Claim the control over the wifi interface from this wifi client. 1221 1222 This claim the ownership of the wifi interface from shill and 1223 wpa_supplicant. The wifi interface should be UP when this call returns. 1224 1225 """ 1226 # Disabling a wifi device in shill will remove that device from 1227 # wpa_supplicant as well. 1228 self.set_device_enabled(self._wifi_if, False) 1229 1230 # Wait for shill to bring down the wifi interface. 1231 is_interface_down = lambda: not self._interface.is_up 1232 utils.poll_for_condition( 1233 is_interface_down, 1234 timeout=INTERFACE_DOWN_WAIT_TIME_SECONDS, 1235 sleep_interval=0.5, 1236 desc='Timeout waiting for interface to go down.') 1237 # Bring up the wifi interface to allow the test to use the interface. 1238 self.host.run('%s link set %s up' % (self.cmd_ip, self.wifi_if)) 1239 1240 1241 def set_sched_scan(self, enable, fail_on_unsupported=False): 1242 """enable/disable scheduled scan. 1243 1244 @param enable bool flag indicating to enable/disable scheduled scan. 1245 1246 """ 1247 if fail_on_unsupported: 1248 self._assert_method_supported('set_sched_scan') 1249 elif not self._supports_method('set_sched_scan'): 1250 return False 1251 return self._shill_proxy.set_sched_scan(enable) 1252 1253 1254 def check_connected_on_last_resume(self): 1255 """Checks whether the DUT was connected on its last resume. 1256 1257 Checks that the DUT was connected after waking from suspend by parsing 1258 the last instance shill log message that reports shill's connection 1259 status on resume. Fails the test if this log message reports that 1260 the DUT woke up disconnected. 1261 1262 """ 1263 # As of build R43 6913.0.0, the shill log message from the function 1264 # OnAfterResume is called as soon as shill resumes from suspend, and 1265 # will report whether or not shill is connected. The log message will 1266 # take one of the following two forms: 1267 # 1268 # [...] [INFO:wifi.cc(1941)] OnAfterResume: connected 1269 # [...] [INFO:wifi.cc(1941)] OnAfterResume: not connected 1270 # 1271 # where 1941 is an arbitrary PID number. By checking if the last 1272 # instance of this message contains the substring "not connected", we 1273 # can determine whether or not shill was connected on its last resume. 1274 connection_status_log_regex_str = 'INFO:wifi\.cc.*OnAfterResume' 1275 not_connected_substr = 'not connected' 1276 connected_substr = 'connected' 1277 1278 cmd = ('grep -E %s /var/log/net.log | tail -1' % 1279 connection_status_log_regex_str) 1280 connection_status_log = self.host.run(cmd).stdout 1281 if not connection_status_log: 1282 raise error.TestFail('Could not find resume connection status log ' 1283 'message.') 1284 1285 logging.debug('Connection status message:\n%s', connection_status_log) 1286 if not_connected_substr in connection_status_log: 1287 raise error.TestFail('Client was not connected upon waking from ' 1288 'suspend.') 1289 1290 if not connected_substr in connection_status_log: 1291 raise error.TestFail('Last resume log message did not contain ' 1292 'connection status.') 1293 1294 logging.info('Client was connected upon waking from suspend.') 1295 1296 1297 def check_wake_on_wifi_throttled(self): 1298 """ 1299 Checks whether wake on WiFi was throttled on the DUT on the last dark 1300 resume. Check for this by parsing shill logs for a throttling message. 1301 1302 """ 1303 # We are looking for an dark resume error log message indicating that 1304 # wake on WiFi was throttled. This is an example of the error message: 1305 # [...] [ERROR:wake_on_wifi.cc(1304)] OnDarkResume: Too many dark \ 1306 # resumes; disabling wake on WiFi temporarily 1307 dark_resume_log_regex_str = 'ERROR:wake_on_wifi\.cc.*OnDarkResume:.*' 1308 throttled_msg_substr = ('Too many dark resumes; disabling wake on ' 1309 'WiFi temporarily') 1310 1311 cmd = ('grep -E %s /var/log/net.log | tail -1' % 1312 dark_resume_log_regex_str) 1313 last_dark_resume_error_log = self.host.run(cmd).stdout 1314 if not last_dark_resume_error_log: 1315 raise error.TestFail('Could not find a dark resume log message.') 1316 1317 logging.debug('Last dark resume log message:\n%s', 1318 last_dark_resume_error_log) 1319 if not throttled_msg_substr in last_dark_resume_error_log: 1320 raise error.TestFail('Wake on WiFi was not throttled on the last ' 1321 'dark resume.') 1322 1323 logging.info('Wake on WiFi was throttled on the last dark resume.') 1324 1325 1326 def shill_debug_log(self, message): 1327 """Logs a message to the shill log (i.e. /var/log/net.log). This 1328 message will be logged at the DEBUG level. 1329 1330 @param message: the message to be logged. 1331 1332 """ 1333 logging.info(message) 1334 logger_command = ('/usr/bin/logger' 1335 ' --tag shill' 1336 ' --priority daemon.debug' 1337 ' "%s"' % utils.sh_escape(message)) 1338 self.host.run(logger_command) 1339 1340 1341 def is_wake_on_wifi_supported(self): 1342 """Returns true iff wake-on-WiFi is supported by the DUT.""" 1343 1344 if (self.shill.get_dbus_property_on_device( 1345 self.wifi_if, self.WAKE_ON_WIFI_FEATURES) == 1346 WAKE_ON_WIFI_NOT_SUPPORTED): 1347 return False 1348 return True 1349 1350 1351 def set_manager_property(self, prop_name, prop_value): 1352 """Sets the given manager property to the value provided. 1353 1354 @param prop_name: the property to be set 1355 @param prop_value: value to assign to the prop_name 1356 @return a context manager for the setting 1357 1358 """ 1359 return TemporaryManagerDBusProperty(self._shill_proxy, 1360 prop_name, 1361 prop_value) 1362 1363 1364class TemporaryDeviceDBusProperty: 1365 """Utility class to temporarily change a dbus property for the WiFi device. 1366 1367 Since dbus properties are global and persistent settings, we want 1368 to make sure that we change them back to what they were before the test 1369 started. 1370 1371 """ 1372 1373 def __init__(self, shill_proxy, iface, prop_name, value): 1374 """Construct a TemporaryDeviceDBusProperty context manager. 1375 1376 1377 @param shill_proxy: the shill proxy to use to communicate via dbus 1378 @param iface: device whose property to change (e.g. 'wlan0') 1379 @param prop_name: the name of the property we want to set 1380 @param value: the desired value of the property 1381 1382 """ 1383 self._shill = shill_proxy 1384 self._interface = iface 1385 self._prop_name = prop_name 1386 self._value = value 1387 self._saved_value = None 1388 1389 1390 def __enter__(self): 1391 logging.info('- Setting property %s on device %s', 1392 self._prop_name, 1393 self._interface) 1394 1395 self._saved_value = self._shill.get_dbus_property_on_device( 1396 self._interface, self._prop_name) 1397 if self._saved_value is None: 1398 raise error.TestFail('Device or property not found.') 1399 if not self._shill.set_dbus_property_on_device(self._interface, 1400 self._prop_name, 1401 self._value): 1402 raise error.TestFail('Could not set property') 1403 1404 logging.info('- Changed value from %s to %s', 1405 self._saved_value, 1406 self._value) 1407 1408 1409 def __exit__(self, exception, value, traceback): 1410 logging.info('- Resetting property %s', self._prop_name) 1411 1412 if not self._shill.set_dbus_property_on_device(self._interface, 1413 self._prop_name, 1414 self._saved_value): 1415 raise error.TestFail('Could not reset property') 1416 1417 1418class TemporaryManagerDBusProperty: 1419 """Utility class to temporarily change a Manager dbus property. 1420 1421 Since dbus properties are global and persistent settings, we want 1422 to make sure that we change them back to what they were before the test 1423 started. 1424 1425 """ 1426 1427 def __init__(self, shill_proxy, prop_name, value): 1428 """Construct a TemporaryManagerDBusProperty context manager. 1429 1430 @param shill_proxy: the shill proxy to use to communicate via dbus 1431 @param prop_name: the name of the property we want to set 1432 @param value: the desired value of the property 1433 1434 """ 1435 self._shill = shill_proxy 1436 self._prop_name = prop_name 1437 self._value = value 1438 self._saved_value = None 1439 1440 1441 def __enter__(self): 1442 logging.info('- Setting Manager property: %s', self._prop_name) 1443 1444 self._saved_value = self._shill.get_manager_property( 1445 self._prop_name) 1446 if self._saved_value is None: 1447 self._saved_value = "" 1448 if not self._shill.set_optional_manager_property(self._prop_name, 1449 self._value): 1450 raise error.TestFail('Could not set optional manager property.') 1451 else: 1452 setprop_result = self._shill.set_manager_property(self._prop_name, 1453 self._value) 1454 if not setprop_result: 1455 raise error.TestFail('Could not set manager property') 1456 1457 logging.info('- Changed value from [%s] to [%s]', 1458 self._saved_value, 1459 self._value) 1460 1461 1462 def __exit__(self, exception, value, traceback): 1463 logging.info('- Resetting property %s to [%s]', 1464 self._prop_name, 1465 self._saved_value) 1466 1467 if not self._shill.set_manager_property(self._prop_name, 1468 self._saved_value): 1469 raise error.TestFail('Could not reset manager property') 1470