1# Lint as: python2, python3 2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import contextlib 7import logging 8import math 9import re 10import time 11 12from contextlib import contextmanager 13from collections import namedtuple 14 15from autotest_lib.client.common_lib import error 16from autotest_lib.client.common_lib import utils 17from autotest_lib.client.common_lib.cros.network import interface 18from autotest_lib.client.common_lib.cros.network import iw_runner 19from autotest_lib.client.common_lib.cros.network import ping_runner 20from autotest_lib.client.cros import constants 21from autotest_lib.server import autotest 22from autotest_lib.server import site_linux_system 23from autotest_lib.server.cros.network import wpa_cli_proxy 24from autotest_lib.server.cros.network import wpa_mon 25from autotest_lib.server.hosts import cast_os_host 26 27# Wake-on-WiFi feature strings 28WAKE_ON_WIFI_NONE = 'none' 29WAKE_ON_WIFI_PACKET = 'packet' 30WAKE_ON_WIFI_DARKCONNECT = 'darkconnect' 31WAKE_ON_WIFI_PACKET_DARKCONNECT = 'packet_and_darkconnect' 32WAKE_ON_WIFI_NOT_SUPPORTED = 'not_supported' 33 34# Wake-on-WiFi test timing constants 35SUSPEND_WAIT_TIME_SECONDS = 10 36RECEIVE_PACKET_WAIT_TIME_SECONDS = 10 37DARK_RESUME_WAIT_TIME_SECONDS = 25 38WAKE_TO_SCAN_PERIOD_SECONDS = 30 39NET_DETECT_SCAN_WAIT_TIME_SECONDS = 15 40WAIT_UP_TIMEOUT_SECONDS = 10 41DISCONNECT_WAIT_TIME_SECONDS = 10 42INTERFACE_DOWN_WAIT_TIME_SECONDS = 10 43 44ConnectTime = namedtuple('ConnectTime', 'state, time') 45 46XMLRPC_BRINGUP_TIMEOUT_SECONDS = 60 47SHILL_XMLRPC_LOG_PATH = '/var/log/shill_xmlrpc_server.log' 48 49 50def _is_eureka_host(host): 51 return host.get_os_type() == cast_os_host.OS_TYPE_CAST_OS 52 53 54def get_xmlrpc_proxy(host): 55 """Get a shill XMLRPC proxy for |host|. 56 57 The returned object has no particular type. Instead, when you call 58 a method on the object, it marshalls the objects passed as arguments 59 and uses them to make RPCs on the remote server. Thus, you should 60 read shill_xmlrpc_server.py to find out what methods are supported. 61 62 @param host: host object representing a remote device. 63 @return proxy object for remote XMLRPC server. 64 65 """ 66 # Make sure the client library is on the device so that the proxy 67 # code is there when we try to call it. 68 if host.is_client_install_supported: 69 client_at = autotest.Autotest(host) 70 client_at.install() 71 # This is the default port for shill xmlrpc server. 72 server_port = constants.SHILL_XMLRPC_SERVER_PORT 73 xmlrpc_server_command = constants.SHILL_XMLRPC_SERVER_COMMAND 74 log_path = SHILL_XMLRPC_LOG_PATH 75 command_name = constants.SHILL_XMLRPC_SERVER_CLEANUP_PATTERN 76 rpc_server_host = host 77 78 # Start up the XMLRPC proxy on the client 79 proxy = rpc_server_host.rpc_server_tracker.xmlrpc_connect( 80 xmlrpc_server_command, 81 server_port, 82 command_name=command_name, 83 ready_test_name=constants.SHILL_XMLRPC_SERVER_READY_METHOD, 84 timeout_seconds=XMLRPC_BRINGUP_TIMEOUT_SECONDS, 85 logfile=log_path 86 ) 87 return proxy 88 89 90def _is_conductive(host): 91 """Determine if the host is conductive based on AFE labels. 92 93 @param host: A Host object. 94 """ 95 info = host.host_info_store.get() 96 conductive = info.get_label_value('conductive') 97 return conductive.lower() == 'true' 98 99 100class WiFiClient(site_linux_system.LinuxSystem): 101 """WiFiClient is a thin layer of logic over a remote DUT in wifitests.""" 102 103 DEFAULT_PING_COUNT = 10 104 COMMAND_PING = 'ping' 105 106 MAX_SERVICE_GONE_TIMEOUT_SECONDS = 60 107 108 # List of interface names we won't consider for use as "the" WiFi interface 109 # on Android or CastOS hosts. 110 WIFI_IF_BLOCKLIST = ['p2p0', 'wfd0'] 111 112 UNKNOWN_BOARD_TYPE = 'unknown' 113 114 # DBus device properties. Wireless interfaces should support these. 115 WAKE_ON_WIFI_FEATURES = 'WakeOnWiFiFeaturesEnabled' 116 NET_DETECT_SCAN_PERIOD = 'NetDetectScanPeriodSeconds' 117 WAKE_TO_SCAN_PERIOD = 'WakeToScanPeriodSeconds' 118 FORCE_WAKE_TO_SCAN_TIMER = 'ForceWakeToScanTimer' 119 MAC_ADDRESS_RANDOMIZATION_SUPPORTED = 'MACAddressRandomizationSupported' 120 MAC_ADDRESS_RANDOMIZATION_ENABLED = 'MACAddressRandomizationEnabled' 121 122 CONNECTED_STATES = ['portal', 'no-connectivity', 'redirect-found', 123 'portal-suspected', 'online', 'ready'] 124 125 @property 126 def machine_id(self): 127 """@return string unique to a particular board/cpu configuration.""" 128 if self._machine_id: 129 return self._machine_id 130 131 uname_result = self.host.run('uname -m', ignore_status=True) 132 kernel_arch = '' 133 if not uname_result.exit_status and uname_result.stdout.find(' ') < 0: 134 kernel_arch = uname_result.stdout.strip() 135 cpu_info = self.host.run('cat /proc/cpuinfo').stdout.splitlines() 136 cpu_count = len([x for x in cpu_info if x.lower().startswith('bogomips')]) 137 cpu_count_str = '' 138 if cpu_count: 139 cpu_count_str = 'x%d' % cpu_count 140 ghz_value = '' 141 ghz_pattern = re.compile('([0-9.]+GHz)') 142 for line in cpu_info: 143 match = ghz_pattern.search(line) 144 if match is not None: 145 ghz_value = '_' + match.group(1) 146 break 147 148 return '%s_%s%s%s' % (self.board, kernel_arch, ghz_value, cpu_count_str) 149 150 151 @property 152 def powersave_on(self): 153 """@return bool True iff WiFi powersave mode is enabled.""" 154 result = self.host.run("iw dev %s get power_save" % self.wifi_if) 155 output = result.stdout.rstrip() # NB: chop \n 156 # Output should be either "Power save: on" or "Power save: off". 157 find_re = re.compile(r'([^:]+):\s+(\w+)') 158 find_results = find_re.match(output) 159 if not find_results: 160 raise error.TestFail('Failed to find power_save parameter ' 161 'in iw results.') 162 163 return find_results.group(2) == 'on' 164 165 166 @property 167 def shill(self): 168 """@return shill RPCProxy object.""" 169 return self._shill_proxy 170 171 172 @property 173 def client(self): 174 """Deprecated accessor for the client host. 175 176 The term client is used very loosely in old autotests and this 177 accessor should not be used in new code. Use host() instead. 178 179 @return host object representing a remote DUT. 180 181 """ 182 return self.host 183 184 185 @property 186 def command_ip(self): 187 """@return string path to ip command.""" 188 return self._command_ip 189 190 191 @property 192 def command_iptables(self): 193 """@return string path to iptables command.""" 194 return self._command_iptables 195 196 197 @property 198 def command_ping6(self): 199 """@return string path to ping6 command.""" 200 return self._command_ping6 201 202 203 @property 204 def command_wpa_cli(self): 205 """@return string path to wpa_cli command.""" 206 return self._command_wpa_cli 207 208 209 @property 210 def conductive(self): 211 """@return True if the rig is conductive; False otherwise.""" 212 if self._conductive is None: 213 self._conductive = _is_conductive(self.host) 214 return self._conductive 215 216 217 @conductive.setter 218 def conductive(self, value): 219 """Set the conductive member to True or False. 220 221 @param value: boolean value to set the conductive member to. 222 """ 223 self._conductive = value 224 225 226 @property 227 def module_name(self): 228 """@return Name of kernel module in use by this interface.""" 229 return self._interface.module_name 230 231 @property 232 def parent_device_name(self): 233 """ 234 @return Path of the parent device for the net device""" 235 return self._interface.parent_device_name 236 237 @property 238 def wifi_if(self): 239 """@return string wifi device on machine (e.g. mlan0).""" 240 return self._wifi_if 241 242 243 @property 244 def wifi_mac(self): 245 """@return string MAC address of self.wifi_if.""" 246 return self._interface.mac_address 247 248 249 @property 250 def wifi_ip(self): 251 """@return string IPv4 address of self.wifi_if.""" 252 return self._interface.ipv4_address 253 254 255 @property 256 def wifi_ip_subnet(self): 257 """@return string IPv4 subnet prefix of self.wifi_if.""" 258 return self._interface.ipv4_subnet 259 260 261 @property 262 def wifi_phy_name(self): 263 """@return wiphy name (e.g., 'phy0') or None""" 264 return self._interface.wiphy_name 265 266 @property 267 def wifi_signal_level(self): 268 """Returns the signal level of this DUT's WiFi interface. 269 270 @return int signal level of connected WiFi interface or None (e.g. -67). 271 272 """ 273 return self._interface.signal_level 274 275 @property 276 def wifi_signal_level_all_chains(self): 277 """Returns the signal level of all chains of this DUT's WiFi interface. 278 279 @return int array signal level of each chain of connected WiFi interface 280 or None (e.g. [-67, -60]). 281 282 """ 283 return self._interface.signal_level_all_chains 284 285 @staticmethod 286 def assert_bsses_include_ssids(found_bsses, expected_ssids): 287 """Verifies that |found_bsses| includes |expected_ssids|. 288 289 @param found_bsses list of IwBss objects. 290 @param expected_ssids list of string SSIDs. 291 @raise error.TestFail if any element of |expected_ssids| is not found. 292 293 """ 294 for ssid in expected_ssids: 295 if not ssid: 296 continue 297 298 for bss in found_bsses: 299 if bss.ssid == ssid: 300 break 301 else: 302 raise error.TestFail('SSID %s is not in scan results: %r' % 303 (ssid, found_bsses)) 304 305 306 def wifi_noise_level(self, frequency_mhz): 307 """Returns the noise level of this DUT's WiFi interface. 308 309 @param frequency_mhz: frequency at which the noise level should be 310 measured and reported. 311 @return int signal level of connected WiFi interface in dBm (e.g. -67) 312 or None if the value is unavailable. 313 314 """ 315 return self._interface.noise_level(frequency_mhz) 316 317 318 def __init__(self, client_host, result_dir, use_wpa_cli): 319 """ 320 Construct a WiFiClient. 321 322 @param client_host host object representing a remote host. 323 @param result_dir string directory to store test logs/packet caps. 324 @param use_wpa_cli bool True if we want to use |wpa_cli| commands for 325 Android testing. 326 327 """ 328 super(WiFiClient, self).__init__(client_host, 'client', 329 inherit_interfaces=True) 330 self._command_ip = 'ip' 331 self._command_iptables = 'iptables -w 5' 332 self._command_ping6 = 'ping6' 333 self._command_wpa_cli = 'wpa_cli' 334 self._machine_id = None 335 self._result_dir = result_dir 336 self._conductive = None 337 338 if _is_eureka_host(self.host) and use_wpa_cli: 339 # Look up the WiFi device (and its MAC) on the client. 340 devs = self.iw_runner.list_interfaces(desired_if_type='managed') 341 devs = [dev for dev in devs 342 if dev.if_name not in self.WIFI_IF_BLOCKLIST] 343 if not devs: 344 raise error.TestFail('No wlan devices found on %s.' % 345 self.host.hostname) 346 347 if len(devs) > 1: 348 logging.warning('Warning, found multiple WiFi devices on ' 349 '%s: %r', self.host.hostname, devs) 350 self._wifi_if = devs[0].if_name 351 self._shill_proxy = wpa_cli_proxy.WpaCliProxy( 352 self.host, self._wifi_if) 353 self._wpa_cli_proxy = self._shill_proxy 354 else: 355 self._shill_proxy = get_xmlrpc_proxy(self.host) 356 interfaces = self._shill_proxy.list_controlled_wifi_interfaces() 357 if not interfaces: 358 logging.debug('No interfaces managed by shill. Rebooting host') 359 self.host.reboot() 360 raise error.TestError('No interfaces managed by shill on %s' % 361 self.host.hostname) 362 self._wifi_if = interfaces[0] 363 self._wpa_cli_proxy = wpa_cli_proxy.WpaCliProxy( 364 self.host, self._wifi_if) 365 self._raise_logging_level() 366 self._interface = interface.Interface(self._wifi_if, host=self.host) 367 self._wpa_mon = wpa_mon.WpaMon(self.host, self.wifi_if) 368 logging.debug('WiFi interface is: %r', 369 self._interface.device_description) 370 # All tests that use this object assume the interface starts enabled. 371 self.set_device_enabled(self._wifi_if, True) 372 # Turn off powersave mode by default. 373 self.powersave_switch(False) 374 # Invoke the |capabilities| property defined in the parent |Linuxsystem| 375 # to workaround the lazy loading of the capabilities cache and supported 376 # frequency list. This is needed for tests that may need access to these 377 # when the DUT is unreachable (for ex: suspended). 378 #pylint: disable=pointless-statement 379 self.capabilities 380 381 382 def _assert_method_supported(self, method_name): 383 """Raise a TestNAError if the XMLRPC proxy has no method |method_name|. 384 385 @param method_name: string name of method that should exist on the 386 XMLRPC proxy. 387 388 """ 389 if not self._supports_method(method_name): 390 raise error.TestNAError('%s() is not supported' % method_name) 391 392 393 def _raise_logging_level(self): 394 """Raises logging levels for WiFi on DUT.""" 395 self.host.run('ff_debug --level -2', ignore_status=True) 396 self.host.run('ff_debug +wifi', ignore_status=True) 397 398 399 def is_vht_supported(self): 400 """Returns True if VHT supported; False otherwise""" 401 return self.CAPABILITY_VHT in self.capabilities 402 403 404 def is_5ghz_supported(self): 405 """Returns True if 5Ghz bands are supported; False otherwise.""" 406 return self.CAPABILITY_5GHZ in self.capabilities 407 408 409 def is_ibss_supported(self): 410 """Returns True if IBSS mode is supported; False otherwise.""" 411 return self.CAPABILITY_IBSS in self.capabilities 412 413 414 def is_frequency_supported(self, frequency): 415 """Returns True if the given frequency is supported; False otherwise. 416 417 @param frequency: int Wifi frequency to check if it is supported by 418 DUT. 419 """ 420 return frequency in self.phys_for_frequency 421 422 423 def _supports_method(self, method_name): 424 """Checks if |method_name| is supported on the remote XMLRPC proxy. 425 426 autotest will, for their own reasons, install python files in the 427 autotest client package that correspond the version of the build 428 rather than the version running on the autotest drone. This 429 creates situations where we call methods on the client XMLRPC proxy 430 that don't exist in that version of the code. This detects those 431 situations so that we can degrade more or less gracefully. 432 433 @param method_name: string name of method that should exist on the 434 XMLRPC proxy. 435 @return True if method is available, False otherwise. 436 437 """ 438 supported = (_is_eureka_host(self.host) 439 or method_name in self._shill_proxy.system.listMethods()) 440 if not supported: 441 logging.warning('%s() is not supported on older images', 442 method_name) 443 return supported 444 445 446 def close(self): 447 """Tear down state associated with the client.""" 448 self.stop_capture() 449 self.powersave_switch(False) 450 self.shill.clean_profiles() 451 super(WiFiClient, self).close() 452 453 def sync_host_times(self): 454 """Set time on our DUT to match local time.""" 455 epoch_seconds = time.time() 456 self.shill.sync_time_to(epoch_seconds) 457 458 459 def collect_debug_info(self, local_save_dir_prefix): 460 """Collect any debug information needed from the DUT 461 462 This invokes the |collect_debug_info| RPC method to trigger 463 bugreport/logcat collection and then transfers the logs to the 464 server. 465 466 @param local_save_dir_prefix Used as a prefix for local save directory. 467 """ 468 pass 469 470 471 def check_iw_link_value(self, iw_link_key, desired_value): 472 """Assert that the current wireless link property is |desired_value|. 473 474 @param iw_link_key string one of IW_LINK_KEY_* defined in iw_runner. 475 @param desired_value string desired value of iw link property. 476 477 """ 478 actual_value = self.get_iw_link_value(iw_link_key) 479 desired_value = str(desired_value) 480 if actual_value != desired_value: 481 raise error.TestFail('Wanted iw link property %s value %s, but ' 482 'got %s instead.' % (iw_link_key, 483 desired_value, 484 actual_value)) 485 486 487 def get_iw_link_value(self, iw_link_key): 488 """Get the current value of a link property for this WiFi interface. 489 490 @param iw_link_key string one of IW_LINK_KEY_* defined in iw_runner. 491 492 """ 493 return self.iw_runner.get_link_value(self.wifi_if, iw_link_key) 494 495 496 def powersave_switch(self, turn_on): 497 """Toggle powersave mode for the DUT. 498 499 @param turn_on bool True iff powersave mode should be turned on. 500 501 """ 502 mode = 'off' 503 if turn_on: 504 mode = 'on' 505 # Turn ON interface and set power_save option. 506 self.host.run('ifconfig %s up' % self.wifi_if) 507 self.host.run('iw dev %s set power_save %s' % (self.wifi_if, mode)) 508 509 510 def timed_scan(self, frequencies, ssids, scan_timeout_seconds=10, 511 retry_timeout_seconds=10): 512 """Request timed scan to discover given SSIDs. 513 514 This method will retry for a default of |retry_timeout_seconds| until it 515 is able to successfully kick off a scan. Sometimes, the device on the 516 DUT claims to be busy and rejects our requests. It will raise error 517 if the scan did not complete within |scan_timeout_seconds| or it was 518 not able to discover the given SSIDs. 519 520 @param frequencies list of int WiFi frequencies to scan for. 521 @param ssids list of string ssids to probe request for. 522 @param scan_timeout_seconds: float number of seconds the scan 523 operation not to exceed. 524 @param retry_timeout_seconds: float number of seconds to retry scanning 525 if the interface is busy. This does not retry if certain 526 SSIDs are missing from the results. 527 @return time in seconds took to complete scan request. 528 529 """ 530 # the poll method returns the result of the func 531 scan_result = utils.poll_for_condition( 532 condition=lambda: self.iw_runner.timed_scan( 533 self.wifi_if, 534 frequencies=frequencies, 535 ssids=ssids), 536 exception=error.TestFail('Unable to trigger scan on client'), 537 timeout=retry_timeout_seconds, 538 sleep_interval=0.5) 539 # Verify scan operation completed within given timeout 540 if scan_result.time > scan_timeout_seconds: 541 raise error.TestFail('Scan time %.2fs exceeds the scan timeout' % 542 (scan_result.time)) 543 544 # Verify all ssids are discovered 545 self.assert_bsses_include_ssids(scan_result.bss_list, ssids) 546 547 logging.info('Wifi scan completed in %.2f seconds', scan_result.time) 548 return scan_result.time 549 550 551 def scan(self, frequencies, ssids, timeout_seconds=10, require_match=True): 552 """Request a scan and (optionally) check that requested SSIDs appear in 553 the results. 554 555 This method will retry for a default of |timeout_seconds| until it is 556 able to successfully kick off a scan. Sometimes, the device on the DUT 557 claims to be busy and rejects our requests. 558 559 If |ssids| is non-empty, we will speficially probe for those SSIDs. 560 561 If |require_match| is True, we will verify that every element 562 of |ssids| was found in scan results. 563 564 @param frequencies list of int WiFi frequencies to scan for. 565 @param ssids list of string ssids to probe request for. 566 @param timeout_seconds: float number of seconds to retry scanning 567 if the interface is busy. This does not retry if certain 568 SSIDs are missing from the results. 569 @param require_match: bool True if we must find |ssids|. 570 571 """ 572 bss_list = utils.poll_for_condition( 573 condition=lambda: self.iw_runner.scan( 574 self.wifi_if, 575 frequencies=frequencies, 576 ssids=ssids), 577 exception=error.TestFail('Unable to trigger scan on client'), 578 timeout=timeout_seconds, 579 sleep_interval=0.5) 580 581 if require_match: 582 self.assert_bsses_include_ssids(bss_list, ssids) 583 584 585 def wait_for_bss(self, bssid, timeout_seconds=15): 586 """Wait for a specific BSS to appear in the scan results. 587 588 @param bssid: string bssid of AP we expect to see in scan results 589 @param timeout_seconds int seconds to wait for BSSes to be discovered 590 591 """ 592 def dut_sees_bss(): 593 """Check if a DUT can see a BSS in scan results. 594 595 @return True iff scan results from DUT include the specified BSS. 596 597 """ 598 is_requested_bss = lambda iw_bss: iw_bss.bss == bssid 599 scan_results = self.iw_runner.scan(self.wifi_if) 600 return scan_results and list(filter(is_requested_bss, scan_results)) 601 try: 602 utils.poll_for_condition( 603 condition=dut_sees_bss, 604 timeout=timeout_seconds, 605 sleep_interval=0.5) 606 except: 607 raise error.TestFail('Failed to discover BSS %s' % bssid) 608 609 610 def wait_for_bsses(self, ssid, num_bss_expected, timeout_seconds=15): 611 """Wait for all BSSes associated with given SSID to be discovered in the 612 scan. 613 614 @param ssid string name of network being queried 615 @param num_bss_expected int number of BSSes expected 616 @param timeout_seconds int seconds to wait for BSSes to be discovered 617 618 """ 619 # If the scan returns None, return 0, else return the matching count 620 621 # Wrap num_bss_actual as a mutable object, list, so that an inner function 622 # can update the value without making an assignment to it. Without any 623 # assignment, the inner function will look for the variable in outer scope 624 # instead of creating a new local one. 625 num_bss_actual = [0] 626 627 def are_all_bsses_discovered(): 628 """Determine if all BSSes associated with the SSID from parent 629 function are discovered in the scan 630 631 @return boolean representing whether the expected bss count matches 632 how many in the scan match the given ssid 633 """ 634 self.claim_wifi_if() # Stop shill/supplicant scans 635 try: 636 scan_results = self.iw_runner.scan(self.wifi_if, 637 frequencies=[], 638 ssids=[ssid]) 639 if scan_results is None: 640 return False 641 num_bss_actual[0] = sum(ssid == bss.ssid 642 for bss in scan_results) 643 return num_bss_expected == num_bss_actual[0] 644 finally: 645 self.release_wifi_if() 646 647 try: 648 utils.poll_for_condition(condition=are_all_bsses_discovered, 649 timeout=timeout_seconds, 650 sleep_interval=0.5) 651 except utils.TimeoutError: 652 raise error.TestFail('Failed to discover all BSSes. Found %d,' 653 ' wanted %d with SSID %s' % 654 (num_bss_actual[0], num_bss_expected, ssid)) 655 656 def wait_for_service_states(self, ssid, states, timeout_seconds): 657 """Waits for a WiFi service to achieve one of |states|. 658 659 @param ssid string name of network being queried 660 @param states tuple list of states for which the caller is waiting 661 @param timeout_seconds int seconds to wait for a state in |states| 662 663 """ 664 logging.info('Waiting for %s to reach one of %r...', ssid, states) 665 success, state, duration = self._shill_proxy.wait_for_service_states( 666 ssid, states, timeout_seconds) 667 logging.info('...ended up in state \'%s\' (%s) after %f seconds.', 668 state, 'success' if success else 'failure', duration) 669 return success, state, duration 670 671 672 def do_suspend(self, seconds): 673 """Puts the DUT in suspend power state for |seconds| seconds. 674 675 @param seconds: The number of seconds to suspend the device. 676 677 """ 678 logging.info('Suspending DUT for %d seconds...', seconds) 679 self._shill_proxy.do_suspend(seconds) 680 logging.info('...done suspending') 681 682 683 def do_suspend_bg(self, seconds): 684 """Suspend DUT using the power manager - non-blocking. 685 686 @param seconds: The number of seconds to suspend the device. 687 688 """ 689 logging.info('Suspending DUT (in background) for %d seconds...', 690 seconds) 691 self._shill_proxy.do_suspend_bg(seconds) 692 693 694 def flush_bss(self, age=0): 695 """Flush supplicant's cached BSS on the DUT. 696 697 @param age: BSS older than |age| seconds will be removed from the cache. 698 """ 699 result = self._wpa_cli_proxy.run_wpa_cli_cmd('bss_flush %d' % age, 700 check_result=False); 701 logging.info('wpa_cli bss_flush %d: out:%r err:%r', age, result.stdout, 702 result.stderr) 703 return result.stdout, result.stderr 704 705 706 def clear_supplicant_blocklist(self): 707 """Clear's the AP blocklist on the DUT. 708 709 @return stdout and stderror returns passed from wpa_cli command. 710 711 """ 712 result = self._wpa_cli_proxy.run_wpa_cli_cmd('bssid_ignore clear', 713 check_result=False) 714 logging.info('wpa_cli bssid_ignore clear: out:%r err:%r', 715 result.stdout, result.stderr) 716 return result.stdout, result.stderr 717 718 719 def get_active_wifi_SSIDs(self): 720 """Get a list of visible SSID's around the DUT 721 722 @return list of string SSIDs 723 724 """ 725 self._assert_method_supported('get_active_wifi_SSIDs') 726 return self._shill_proxy.get_active_wifi_SSIDs() 727 728 729 def set_device_enabled(self, wifi_interface, value, 730 fail_on_unsupported=False): 731 """Enable or disable the WiFi device. 732 733 @param wifi_interface: string name of interface being modified. 734 @param enabled: boolean; true if this device should be enabled, 735 false if this device should be disabled. 736 @return True if it worked; False, otherwise. 737 738 """ 739 if fail_on_unsupported: 740 self._assert_method_supported('set_device_enabled') 741 elif not self._supports_method('set_device_enabled'): 742 return False 743 return self._shill_proxy.set_device_enabled(wifi_interface, value) 744 745 746 def add_arp_entry(self, ip_address, mac_address): 747 """Add an ARP entry to the table associated with the WiFi interface. 748 749 @param ip_address: string IP address associated with the new ARP entry. 750 @param mac_address: string MAC address associated with the new ARP 751 entry. 752 753 """ 754 self.host.run('ip neigh add %s lladdr %s dev %s nud perm' % 755 (ip_address, mac_address, self.wifi_if)) 756 757 def add_wake_packet_source(self, source_ip): 758 """Add |source_ip| as a source that can wake us up with packets. 759 760 @param source_ip: IP address from which to wake upon receipt of packets 761 762 @return True if successful, False otherwise. 763 764 """ 765 return self._shill_proxy.add_wake_packet_source( 766 self.wifi_if, source_ip) 767 768 769 def remove_wake_packet_source(self, source_ip): 770 """Remove |source_ip| as a source that can wake us up with packets. 771 772 @param source_ip: IP address to stop waking on packets from 773 774 @return True if successful, False otherwise. 775 776 """ 777 return self._shill_proxy.remove_wake_packet_source( 778 self.wifi_if, source_ip) 779 780 781 def remove_all_wake_packet_sources(self): 782 """Remove all IPs as sources that can wake us up with packets. 783 784 @return True if successful, False otherwise. 785 786 """ 787 return self._shill_proxy.remove_all_wake_packet_sources(self.wifi_if) 788 789 790 def wake_on_wifi_features(self, features): 791 """Shill supports programming the NIC to wake on special kinds of 792 incoming packets, or on changes to the available APs (disconnect, 793 coming in range of a known SSID). This method allows you to configure 794 what wake-on-WiFi mechanisms are active. It returns a context manager, 795 because this is a system-wide setting and we don't want it to persist 796 across different tests. 797 798 If you enable wake-on-packet, then the IPs registered by 799 add_wake_packet_source will be able to wake the system from suspend. 800 801 The correct way to use this method is: 802 803 with client.wake_on_wifi_features(WAKE_ON_WIFI_DARKCONNECT): 804 ... 805 806 @param features: string from the WAKE_ON_WIFI constants above. 807 808 @return a context manager for the features. 809 810 """ 811 return TemporaryDeviceDBusProperty(self._shill_proxy, 812 self.wifi_if, 813 self.WAKE_ON_WIFI_FEATURES, 814 features) 815 816 817 def net_detect_scan_period_seconds(self, period): 818 """Sets the period between net detect scans performed by the NIC to look 819 for allowlisted SSIDs to |period|. This setting only takes effect if the 820 NIC is programmed to wake on SSID. 821 822 The correct way to use this method is: 823 824 with client.net_detect_scan_period_seconds(60): 825 ... 826 827 @param period: integer number of seconds between NIC net detect scans 828 829 @return a context manager for the net detect scan period 830 831 """ 832 return TemporaryDeviceDBusProperty(self._shill_proxy, 833 self.wifi_if, 834 self.NET_DETECT_SCAN_PERIOD, 835 period) 836 837 838 def wake_to_scan_period_seconds(self, period): 839 """Sets the period between RTC timer wakeups where the system is woken 840 from suspend to perform scans. This setting only takes effect if the 841 NIC is programmed to wake on SSID. Use as with 842 net_detect_scan_period_seconds. 843 844 @param period: integer number of seconds between wake to scan RTC timer 845 wakes. 846 847 @return a context manager for the net detect scan period 848 849 """ 850 return TemporaryDeviceDBusProperty(self._shill_proxy, 851 self.wifi_if, 852 self.WAKE_TO_SCAN_PERIOD, 853 period) 854 855 856 def force_wake_to_scan_timer(self, is_forced): 857 """Sets the boolean value determining whether or not to force the use of 858 the wake to scan RTC timer, which wakes the system from suspend 859 periodically to scan for networks. 860 861 @param is_forced: boolean whether or not to force the use of the wake to 862 scan timer 863 864 @return a context manager for the net detect scan period 865 866 """ 867 return TemporaryDeviceDBusProperty(self._shill_proxy, 868 self.wifi_if, 869 self.FORCE_WAKE_TO_SCAN_TIMER, 870 is_forced) 871 872 873 def mac_address_randomization(self, enabled): 874 """Sets the boolean value determining whether or not to enable MAC 875 address randomization. This instructs the NIC to randomize the last 876 three octets of the MAC address used in probe requests while 877 disconnected to make the DUT harder to track. 878 879 If MAC address randomization is not supported on this DUT and the 880 caller tries to turn it on, this raises a TestNAError. 881 882 @param enabled: boolean whether or not to enable MAC address 883 randomization 884 885 @return a context manager for the MAC address randomization property 886 887 """ 888 if not self._shill_proxy.get_dbus_property_on_device( 889 self.wifi_if, self.MAC_ADDRESS_RANDOMIZATION_SUPPORTED): 890 if enabled: 891 raise error.TestNAError( 892 'MAC address randomization not supported') 893 else: 894 # Return a no-op context manager. 895 return contextlib.nested() 896 897 return TemporaryDeviceDBusProperty( 898 self._shill_proxy, 899 self.wifi_if, 900 self.MAC_ADDRESS_RANDOMIZATION_ENABLED, 901 enabled) 902 903 904 def request_roam(self, bssid): 905 """Request that we roam to the specified BSSID. 906 907 Note that this operation assumes that: 908 909 1) We're connected to an SSID for which |bssid| is a member. 910 2) There is a BSS with an appropriate ID in our scan results. 911 912 This method does not check for success of either the command or 913 the roaming operation. 914 915 @param bssid: string MAC address of bss to roam to. 916 917 """ 918 self._wpa_cli_proxy.run_wpa_cli_cmd('roam %s' % bssid, 919 check_result=False); 920 return True 921 922 923 def request_roam_dbus(self, bssid, iface): 924 """Request that we roam to the specified BSSID through dbus. 925 926 Note that this operation assumes that: 927 928 1) We're connected to an SSID for which |bssid| is a member. 929 2) There is a BSS with an appropriate ID in our scan results. 930 931 @param bssid: string MAC address of bss to roam to. 932 @param iface: interface to use 933 @return True if the roam was initiated successfully. Note that this 934 does not guarantee the roam completed successfully. 935 936 """ 937 self._assert_method_supported('request_roam_dbus') 938 return self._shill_proxy.request_roam_dbus(bssid, iface) 939 940 941 def wait_for_roam(self, bssid, timeout_seconds=10.0): 942 """Wait for a roam to the given |bssid|. 943 944 @param bssid: string bssid to expect a roam to 945 (e.g. '00:11:22:33:44:55'). 946 @param timeout_seconds: float number of seconds to wait for a roam. 947 @return True if we detect an association to the given |bssid| within 948 |timeout_seconds|. 949 950 """ 951 def attempt_roam(): 952 """Perform a roam to the given |bssid| 953 954 @return True if there is an assocation between the given |bssid| 955 and that association is detected within the timeout frame 956 """ 957 current_bssid = self.iw_runner.get_current_bssid(self.wifi_if) 958 logging.debug('Current BSSID is %s.', current_bssid) 959 return current_bssid == bssid 960 961 start_time = time.time() 962 duration = 0.0 963 try: 964 success = utils.poll_for_condition( 965 condition=attempt_roam, 966 timeout=timeout_seconds, 967 sleep_interval=0.5, 968 desc='Wait for a roam to the given bssid') 969 # wait_for_roam should return False on timeout 970 except utils.TimeoutError: 971 success = False 972 973 duration = time.time() - start_time 974 logging.debug('%s to %s in %f seconds.', 975 'Roamed ' if success else 'Failed to roam ', 976 bssid, 977 duration) 978 return success 979 980 981 def wait_for_ssid_vanish(self, ssid): 982 """Wait for shill to notice that there are no BSS's for an SSID present. 983 984 Raise a test failing exception if this does not come to pass. 985 986 @param ssid: string SSID of the network to require be missing. 987 988 """ 989 def is_missing_yet(): 990 """Determine if the ssid is removed from the service list yet 991 992 @return True if the ssid is not found in the service list 993 """ 994 visible_ssids = self.get_active_wifi_SSIDs() 995 logging.info('Got service list: %r', visible_ssids) 996 if ssid not in visible_ssids: 997 return True 998 self.scan(frequencies=[], ssids=[], timeout_seconds=30) 999 1000 utils.poll_for_condition( 1001 condition=is_missing_yet, # func 1002 exception=error.TestFail('shill should mark BSS not present'), 1003 timeout=self.MAX_SERVICE_GONE_TIMEOUT_SECONDS, 1004 sleep_interval=0) 1005 1006 def reassociate(self, timeout_seconds=10): 1007 """Reassociate to the connected network. 1008 1009 @param timeout_seconds: float number of seconds to wait for operation 1010 to complete. 1011 1012 """ 1013 logging.info('Attempt to reassociate') 1014 with self.iw_runner.get_event_logger() as logger: 1015 logger.start() 1016 # Issue reattach command to wpa_supplicant 1017 self._wpa_cli_proxy.run_wpa_cli_cmd('reattach'); 1018 1019 # Wait for the timeout seconds for association to complete 1020 time.sleep(timeout_seconds) 1021 1022 # Stop iw event logger 1023 logger.stop() 1024 1025 # Get association time based on the iw event log 1026 reassociate_time = logger.get_reassociation_time() 1027 if reassociate_time is None or reassociate_time > timeout_seconds: 1028 raise error.TestFail( 1029 'Failed to reassociate within given timeout') 1030 logging.info('Reassociate time: %.2f seconds', reassociate_time) 1031 1032 1033 def wait_for_connection(self, 1034 ssid, 1035 timeout_seconds=30, 1036 freq=None, 1037 ping_ip=None, 1038 desired_subnet=None, 1039 source_iface=None): 1040 """Verifies a connection to network ssid, optionally verifying 1041 frequency, ping connectivity and subnet. 1042 1043 @param ssid string ssid of the network to check. 1044 @param timeout_seconds int number of seconds to wait for 1045 connection on the given frequency. 1046 @param freq int frequency of network to check. 1047 @param ping_ip string ip address to ping for verification. 1048 @param desired_subnet string expected subnet in which client 1049 ip address should reside. 1050 1051 @returns a named tuple of (state, time) 1052 """ 1053 start_time = time.time() 1054 duration = lambda: time.time() - start_time 1055 1056 # Wrap state as a mutable object, list, so that an inner function can 1057 # update the value without making an assignment to it. Without any 1058 # assignment, the inner function will look for the variable in outer 1059 # scope instead of creating a new local one. 1060 state = [None] 1061 def verify_connection(): 1062 """Verify the connection and perform optional operations 1063 as defined in the parent function 1064 1065 @return False if there is a failure in the connection or 1066 the prescribed verification steps 1067 The named tuple ConnectTime otherwise, with the state 1068 and connection time from waiting for service states 1069 """ 1070 success, state[0], conn_time = self.wait_for_service_states( 1071 ssid, 1072 self.CONNECTED_STATES, 1073 int(math.ceil(timeout_seconds - duration()))) 1074 if not success: 1075 return False 1076 1077 if freq: 1078 actual_freq = self.get_iw_link_value( 1079 iw_runner.IW_LINK_KEY_FREQUENCY) 1080 if str(freq) != actual_freq: 1081 logging.debug( 1082 'Waiting for desired frequency %s (got %s).', 1083 freq, 1084 actual_freq) 1085 return False 1086 1087 if desired_subnet: 1088 actual_subnet = self.wifi_ip_subnet 1089 if actual_subnet != desired_subnet: 1090 logging.debug( 1091 'Waiting for desired subnet %s (got %s).', 1092 desired_subnet, 1093 actual_subnet) 1094 return False 1095 1096 if ping_ip: 1097 ping_config = ping_runner.PingConfig(ping_ip, 1098 source_iface=source_iface) 1099 self.ping(ping_config) 1100 1101 return ConnectTime(state[0], conn_time) 1102 1103 freq_error_str = (' on frequency %d Mhz' % freq) if freq else '' 1104 1105 try: 1106 ret = utils.poll_for_condition( 1107 condition=verify_connection, 1108 timeout=timeout_seconds, 1109 sleep_interval=0) 1110 except utils.TimeoutError: 1111 raise error.TestFail( 1112 'Failed to connect to "%s"%s in %f seconds (state=%s)' % 1113 (ssid, 1114 freq_error_str, 1115 duration(), 1116 state[0])) 1117 return ret 1118 1119 @contextmanager 1120 def assert_disconnect_count(self, count): 1121 """Context asserting |count| disconnects for the context lifetime. 1122 1123 Creates an iw logger during the lifetime of the context and asserts 1124 that the client disconnects exactly |count| times. 1125 1126 @param count int the expected number of disconnections. 1127 1128 """ 1129 with self.iw_runner.get_event_logger() as logger: 1130 logger.start() 1131 yield 1132 logger.stop() 1133 if logger.get_disconnect_count() != count: 1134 raise error.TestFail( 1135 'Client disconnected %d times; expected %d' % 1136 (logger.get_disconnect_count(), count)) 1137 1138 1139 def assert_no_disconnects(self): 1140 """Context asserting no disconnects for the context lifetime.""" 1141 return self.assert_disconnect_count(0) 1142 1143 1144 @contextmanager 1145 def assert_disconnect_event(self): 1146 """Context asserting at least one disconnect for the context lifetime. 1147 1148 Creates an iw logger during the lifetime of the context and asserts 1149 that the client disconnects at least one time. 1150 1151 """ 1152 with self.iw_runner.get_event_logger() as logger: 1153 logger.start() 1154 yield 1155 logger.stop() 1156 if logger.get_disconnect_count() == 0: 1157 raise error.TestFail('Client did not disconnect') 1158 1159 1160 def get_num_card_resets(self): 1161 """Get card reset count.""" 1162 reset_msg = '[m]wifiex_sdio_card_reset' 1163 result = self.host.run('grep -c %s /var/log/messages' % reset_msg, 1164 ignore_status=True) 1165 if result.exit_status == 1: 1166 return 0 1167 count = int(result.stdout.strip()) 1168 return count 1169 1170 1171 def get_disconnect_reasons(self): 1172 """Get disconnect reason codes.""" 1173 disconnect_reason_msg = "updated DisconnectReason " 1174 disconnect_reason_cleared = "clearing DisconnectReason for " 1175 result = self.host.run('grep -a -E "(%s|%s)" /var/log/net.log' % 1176 (disconnect_reason_msg, 1177 disconnect_reason_cleared), 1178 ignore_status=True) 1179 if result.exit_status == 1: 1180 return None 1181 1182 lines = result.stdout.strip().split('\n') 1183 disconnect_reasons = [] 1184 disconnect_reason_regex = re.compile(r' to (\D?\d+)') 1185 1186 found = False 1187 for line in reversed(lines): 1188 match = disconnect_reason_regex.search(line) 1189 if match is not None: 1190 disconnect_reasons.append(match.group(1)) 1191 found = True 1192 else: 1193 if (found): 1194 break 1195 return list(reversed(disconnect_reasons)) 1196 1197 1198 def release_wifi_if(self): 1199 """Release the control over the wifi interface back to normal operation. 1200 1201 This will give the ownership of the wifi interface back to shill and 1202 wpa_supplicant. 1203 1204 """ 1205 self.set_device_enabled(self._wifi_if, True) 1206 1207 1208 def claim_wifi_if(self): 1209 """Claim the control over the wifi interface from this wifi client. 1210 1211 This claim the ownership of the wifi interface from shill and 1212 wpa_supplicant. The wifi interface should be UP when this call returns. 1213 1214 """ 1215 # Disabling a wifi device in shill will remove that device from 1216 # wpa_supplicant as well. 1217 self.set_device_enabled(self._wifi_if, False) 1218 1219 # Wait for shill to bring down the wifi interface. 1220 is_interface_down = lambda: not self._interface.is_up 1221 utils.poll_for_condition( 1222 is_interface_down, 1223 timeout=INTERFACE_DOWN_WAIT_TIME_SECONDS, 1224 sleep_interval=0.5, 1225 desc='Timeout waiting for interface to go down.') 1226 # Bring up the wifi interface to allow the test to use the interface. 1227 self.host.run('%s link set %s up' % (self.cmd_ip, self.wifi_if)) 1228 1229 1230 def set_sched_scan(self, enable, fail_on_unsupported=False): 1231 """enable/disable scheduled scan. 1232 1233 @param enable bool flag indicating to enable/disable scheduled scan. 1234 1235 """ 1236 if fail_on_unsupported: 1237 self._assert_method_supported('set_sched_scan') 1238 elif not self._supports_method('set_sched_scan'): 1239 return False 1240 return self._shill_proxy.set_sched_scan(enable) 1241 1242 1243 def check_connected_on_last_resume(self): 1244 """Checks whether the DUT was connected on its last resume. 1245 1246 Checks that the DUT was connected after waking from suspend by parsing 1247 the last instance shill log message that reports shill's connection 1248 status on resume. Fails the test if this log message reports that 1249 the DUT woke up disconnected. 1250 1251 """ 1252 # As of build R43 6913.0.0, the shill log message from the function 1253 # OnAfterResume is called as soon as shill resumes from suspend, and 1254 # will report whether or not shill is connected. The log message will 1255 # take one of the following two forms: 1256 # 1257 # [...] [INFO:wifi.cc(1941)] OnAfterResume: connected 1258 # [...] [INFO:wifi.cc(1941)] OnAfterResume: not connected 1259 # 1260 # where 1941 is an arbitrary PID number. By checking if the last 1261 # instance of this message contains the substring "not connected", we 1262 # can determine whether or not shill was connected on its last resume. 1263 connection_status_log_regex_str = str(r'INFO:wifi\.cc.*OnAfterResume') 1264 not_connected_substr = str(r'not connected') 1265 connected_substr = str(r'connected') 1266 1267 cmd = ('grep -E %s /var/log/net.log | tail -1' % 1268 connection_status_log_regex_str) 1269 connection_status_log = self.host.run(cmd).stdout 1270 if not connection_status_log: 1271 raise error.TestFail('Could not find resume connection status log ' 1272 'message.') 1273 1274 logging.debug('Connection status message:\n%s', connection_status_log) 1275 if not_connected_substr in connection_status_log: 1276 raise error.TestFail('Client was not connected upon waking from ' 1277 'suspend.') 1278 1279 if not connected_substr in connection_status_log: 1280 raise error.TestFail('Last resume log message did not contain ' 1281 'connection status.') 1282 1283 logging.info('Client was connected upon waking from suspend.') 1284 1285 1286 def check_wake_on_wifi_throttled(self): 1287 """ 1288 Checks whether wake on WiFi was throttled on the DUT on the last dark 1289 resume. Check for this by parsing shill logs for a throttling message. 1290 1291 """ 1292 # We are looking for an dark resume error log message indicating that 1293 # wake on WiFi was throttled. This is an example of the error message: 1294 # [...] [ERROR:wake_on_wifi.cc(1304)] OnDarkResume: Too many dark \ 1295 # resumes; disabling wake on WiFi temporarily 1296 dark_resume_log_regex_str = str(r'ERROR:wake_on_wifi\.cc.*OnDarkResume:.*') 1297 throttled_msg_substr = ('Too many dark resumes; disabling wake on ' 1298 'WiFi temporarily') 1299 1300 cmd = ('grep -E %s /var/log/net.log | tail -1' % 1301 dark_resume_log_regex_str) 1302 last_dark_resume_error_log = self.host.run(cmd).stdout 1303 if not last_dark_resume_error_log: 1304 raise error.TestFail('Could not find a dark resume log message.') 1305 1306 logging.debug('Last dark resume log message:\n%s', 1307 last_dark_resume_error_log) 1308 if not throttled_msg_substr in last_dark_resume_error_log: 1309 raise error.TestFail('Wake on WiFi was not throttled on the last ' 1310 'dark resume.') 1311 1312 logging.info('Wake on WiFi was throttled on the last dark resume.') 1313 1314 1315 def shill_debug_log(self, message): 1316 """Logs a message to the shill log (i.e. /var/log/net.log). This 1317 message will be logged at the DEBUG level. 1318 1319 @param message: the message to be logged. 1320 1321 """ 1322 logging.info(message) 1323 logger_command = ('/usr/bin/logger' 1324 ' --tag shill' 1325 ' --priority daemon.debug' 1326 ' "%s"' % utils.sh_escape(message)) 1327 self.host.run(logger_command) 1328 1329 1330 def is_wake_on_wifi_supported(self): 1331 """Returns true iff wake-on-WiFi is supported by the DUT.""" 1332 1333 if (self.shill.get_dbus_property_on_device( 1334 self.wifi_if, self.WAKE_ON_WIFI_FEATURES) == 1335 WAKE_ON_WIFI_NOT_SUPPORTED): 1336 return False 1337 return True 1338 1339 1340 def set_manager_property(self, prop_name, prop_value): 1341 """Sets the given manager property to the value provided. 1342 1343 @param prop_name: the property to be set 1344 @param prop_value: value to assign to the prop_name 1345 @return a context manager for the setting 1346 1347 """ 1348 return TemporaryManagerDBusProperty(self._shill_proxy, 1349 prop_name, 1350 prop_value) 1351 1352 1353class TemporaryDeviceDBusProperty: 1354 """Utility class to temporarily change a dbus property for the WiFi device. 1355 1356 Since dbus properties are global and persistent settings, we want 1357 to make sure that we change them back to what they were before the test 1358 started. 1359 1360 """ 1361 1362 def __init__(self, shill_proxy, iface, prop_name, value): 1363 """Construct a TemporaryDeviceDBusProperty context manager. 1364 1365 1366 @param shill_proxy: the shill proxy to use to communicate via dbus 1367 @param iface: device whose property to change (e.g. 'wlan0') 1368 @param prop_name: the name of the property we want to set 1369 @param value: the desired value of the property 1370 1371 """ 1372 self._shill = shill_proxy 1373 self._interface = iface 1374 self._prop_name = prop_name 1375 self._value = value 1376 self._saved_value = None 1377 1378 1379 def __enter__(self): 1380 logging.info('- Setting property %s on device %s', 1381 self._prop_name, 1382 self._interface) 1383 1384 self._saved_value = self._shill.get_dbus_property_on_device( 1385 self._interface, self._prop_name) 1386 if self._saved_value is None: 1387 raise error.TestFail('Device or property not found.') 1388 if not self._shill.set_dbus_property_on_device(self._interface, 1389 self._prop_name, 1390 self._value): 1391 raise error.TestFail('Could not set property') 1392 1393 logging.info('- Changed value from %s to %s', 1394 self._saved_value, 1395 self._value) 1396 1397 1398 def __exit__(self, exception, value, traceback): 1399 logging.info('- Resetting property %s', self._prop_name) 1400 1401 if not self._shill.set_dbus_property_on_device(self._interface, 1402 self._prop_name, 1403 self._saved_value): 1404 raise error.TestFail('Could not reset property') 1405 1406 1407class TemporaryManagerDBusProperty: 1408 """Utility class to temporarily change a Manager dbus property. 1409 1410 Since dbus properties are global and persistent settings, we want 1411 to make sure that we change them back to what they were before the test 1412 started. 1413 1414 """ 1415 1416 def __init__(self, shill_proxy, prop_name, value): 1417 """Construct a TemporaryManagerDBusProperty context manager. 1418 1419 @param shill_proxy: the shill proxy to use to communicate via dbus 1420 @param prop_name: the name of the property we want to set 1421 @param value: the desired value of the property 1422 1423 """ 1424 self._shill = shill_proxy 1425 self._prop_name = prop_name 1426 self._value = value 1427 self._saved_value = None 1428 1429 1430 def __enter__(self): 1431 logging.info('- Setting Manager property: %s', self._prop_name) 1432 1433 self._saved_value = self._shill.get_manager_property( 1434 self._prop_name) 1435 if self._saved_value is None: 1436 self._saved_value = "" 1437 if not self._shill.set_optional_manager_property(self._prop_name, 1438 self._value): 1439 raise error.TestFail('Could not set optional manager property.') 1440 else: 1441 setprop_result = self._shill.set_manager_property(self._prop_name, 1442 self._value) 1443 if not setprop_result: 1444 raise error.TestFail('Could not set manager property') 1445 1446 logging.info('- Changed value from [%s] to [%s]', 1447 self._saved_value, 1448 self._value) 1449 1450 1451 def __exit__(self, exception, value, traceback): 1452 logging.info('- Resetting property %s to [%s]', 1453 self._prop_name, 1454 self._saved_value) 1455 1456 if not self._shill.set_manager_property(self._prop_name, 1457 self._saved_value): 1458 raise error.TestFail('Could not reset manager property') 1459