• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import contextlib
7import logging
8import math
9import re
10import time
11
12from contextlib import contextmanager
13from collections import namedtuple
14
15from autotest_lib.client.common_lib import error
16from autotest_lib.client.common_lib import utils
17from autotest_lib.client.common_lib.cros.network import interface
18from autotest_lib.client.common_lib.cros.network import iw_runner
19from autotest_lib.client.common_lib.cros.network import ping_runner
20from autotest_lib.client.cros import constants
21from autotest_lib.server import autotest
22from autotest_lib.server import site_linux_system
23from autotest_lib.server.cros.network import wpa_cli_proxy
24from autotest_lib.server.cros.network import wpa_mon
25from autotest_lib.server.hosts import cast_os_host
26
27# Wake-on-WiFi feature strings
28WAKE_ON_WIFI_NONE = 'none'
29WAKE_ON_WIFI_PACKET = 'packet'
30WAKE_ON_WIFI_DARKCONNECT = 'darkconnect'
31WAKE_ON_WIFI_PACKET_DARKCONNECT = 'packet_and_darkconnect'
32WAKE_ON_WIFI_NOT_SUPPORTED = 'not_supported'
33
34# Wake-on-WiFi test timing constants
35SUSPEND_WAIT_TIME_SECONDS = 10
36RECEIVE_PACKET_WAIT_TIME_SECONDS = 10
37DARK_RESUME_WAIT_TIME_SECONDS = 25
38WAKE_TO_SCAN_PERIOD_SECONDS = 30
39NET_DETECT_SCAN_WAIT_TIME_SECONDS = 15
40WAIT_UP_TIMEOUT_SECONDS = 10
41DISCONNECT_WAIT_TIME_SECONDS = 10
42INTERFACE_DOWN_WAIT_TIME_SECONDS = 10
43
44ConnectTime = namedtuple('ConnectTime', 'state, time')
45
46XMLRPC_BRINGUP_TIMEOUT_SECONDS = 60
47SHILL_XMLRPC_LOG_PATH = '/var/log/shill_xmlrpc_server.log'
48
49
50def _is_eureka_host(host):
51    return host.get_os_type() == cast_os_host.OS_TYPE_CAST_OS
52
53
54def get_xmlrpc_proxy(host):
55    """Get a shill XMLRPC proxy for |host|.
56
57    The returned object has no particular type.  Instead, when you call
58    a method on the object, it marshalls the objects passed as arguments
59    and uses them to make RPCs on the remote server.  Thus, you should
60    read shill_xmlrpc_server.py to find out what methods are supported.
61
62    @param host: host object representing a remote device.
63    @return proxy object for remote XMLRPC server.
64
65    """
66    # Make sure the client library is on the device so that the proxy
67    # code is there when we try to call it.
68    if host.is_client_install_supported:
69        client_at = autotest.Autotest(host)
70        client_at.install()
71    # This is the default port for shill xmlrpc server.
72    server_port = constants.SHILL_XMLRPC_SERVER_PORT
73    xmlrpc_server_command = constants.SHILL_XMLRPC_SERVER_COMMAND
74    log_path = SHILL_XMLRPC_LOG_PATH
75    command_name = constants.SHILL_XMLRPC_SERVER_CLEANUP_PATTERN
76    rpc_server_host = host
77
78    # Start up the XMLRPC proxy on the client
79    proxy = rpc_server_host.rpc_server_tracker.xmlrpc_connect(
80            xmlrpc_server_command,
81            server_port,
82            command_name=command_name,
83            ready_test_name=constants.SHILL_XMLRPC_SERVER_READY_METHOD,
84            timeout_seconds=XMLRPC_BRINGUP_TIMEOUT_SECONDS,
85            logfile=log_path
86      )
87    return proxy
88
89
90def _is_conductive(host):
91    """Determine if the host is conductive based on AFE labels.
92
93    @param host: A Host object.
94    """
95    info = host.host_info_store.get()
96    conductive = info.get_label_value('conductive')
97    return conductive.lower() == 'true'
98
99
100class WiFiClient(site_linux_system.LinuxSystem):
101    """WiFiClient is a thin layer of logic over a remote DUT in wifitests."""
102
103    DEFAULT_PING_COUNT = 10
104    COMMAND_PING = 'ping'
105
106    MAX_SERVICE_GONE_TIMEOUT_SECONDS = 60
107
108    # List of interface names we won't consider for use as "the" WiFi interface
109    # on Android or CastOS hosts.
110    WIFI_IF_BLOCKLIST = ['p2p0', 'wfd0']
111
112    UNKNOWN_BOARD_TYPE = 'unknown'
113
114    # DBus device properties. Wireless interfaces should support these.
115    WAKE_ON_WIFI_FEATURES = 'WakeOnWiFiFeaturesEnabled'
116    NET_DETECT_SCAN_PERIOD = 'NetDetectScanPeriodSeconds'
117    WAKE_TO_SCAN_PERIOD = 'WakeToScanPeriodSeconds'
118    FORCE_WAKE_TO_SCAN_TIMER = 'ForceWakeToScanTimer'
119    MAC_ADDRESS_RANDOMIZATION_SUPPORTED = 'MACAddressRandomizationSupported'
120    MAC_ADDRESS_RANDOMIZATION_ENABLED = 'MACAddressRandomizationEnabled'
121
122    CONNECTED_STATES = ['portal', 'no-connectivity', 'redirect-found',
123                        'portal-suspected', 'online', 'ready']
124
125    @property
126    def machine_id(self):
127        """@return string unique to a particular board/cpu configuration."""
128        if self._machine_id:
129            return self._machine_id
130
131        uname_result = self.host.run('uname -m', ignore_status=True)
132        kernel_arch = ''
133        if not uname_result.exit_status and uname_result.stdout.find(' ') < 0:
134            kernel_arch = uname_result.stdout.strip()
135        cpu_info = self.host.run('cat /proc/cpuinfo').stdout.splitlines()
136        cpu_count = len([x for x in cpu_info if x.lower().startswith('bogomips')])
137        cpu_count_str = ''
138        if cpu_count:
139            cpu_count_str = 'x%d' % cpu_count
140        ghz_value = ''
141        ghz_pattern = re.compile('([0-9.]+GHz)')
142        for line in cpu_info:
143            match = ghz_pattern.search(line)
144            if match is not None:
145                ghz_value = '_' + match.group(1)
146                break
147
148        return '%s_%s%s%s' % (self.board, kernel_arch, ghz_value, cpu_count_str)
149
150
151    @property
152    def powersave_on(self):
153        """@return bool True iff WiFi powersave mode is enabled."""
154        result = self.host.run("iw dev %s get power_save" % self.wifi_if)
155        output = result.stdout.rstrip()       # NB: chop \n
156        # Output should be either "Power save: on" or "Power save: off".
157        find_re = re.compile(r'([^:]+):\s+(\w+)')
158        find_results = find_re.match(output)
159        if not find_results:
160            raise error.TestFail('Failed to find power_save parameter '
161                                 'in iw results.')
162
163        return find_results.group(2) == 'on'
164
165
166    @property
167    def shill(self):
168        """@return shill RPCProxy object."""
169        return self._shill_proxy
170
171
172    @property
173    def client(self):
174        """Deprecated accessor for the client host.
175
176        The term client is used very loosely in old autotests and this
177        accessor should not be used in new code.  Use host() instead.
178
179        @return host object representing a remote DUT.
180
181        """
182        return self.host
183
184
185    @property
186    def command_ip(self):
187        """@return string path to ip command."""
188        return self._command_ip
189
190
191    @property
192    def command_iptables(self):
193        """@return string path to iptables command."""
194        return self._command_iptables
195
196
197    @property
198    def command_ping6(self):
199        """@return string path to ping6 command."""
200        return self._command_ping6
201
202
203    @property
204    def command_wpa_cli(self):
205        """@return string path to wpa_cli command."""
206        return self._command_wpa_cli
207
208
209    @property
210    def conductive(self):
211        """@return True if the rig is conductive; False otherwise."""
212        if self._conductive is None:
213            self._conductive = _is_conductive(self.host)
214        return self._conductive
215
216
217    @conductive.setter
218    def conductive(self, value):
219        """Set the conductive member to True or False.
220
221        @param value: boolean value to set the conductive member to.
222        """
223        self._conductive = value
224
225
226    @property
227    def module_name(self):
228        """@return Name of kernel module in use by this interface."""
229        return self._interface.module_name
230
231    @property
232    def parent_device_name(self):
233        """
234        @return Path of the parent device for the net device"""
235        return self._interface.parent_device_name
236
237    @property
238    def wifi_if(self):
239        """@return string wifi device on machine (e.g. mlan0)."""
240        return self._wifi_if
241
242
243    @property
244    def wifi_mac(self):
245        """@return string MAC address of self.wifi_if."""
246        return self._interface.mac_address
247
248
249    @property
250    def wifi_ip(self):
251        """@return string IPv4 address of self.wifi_if."""
252        return self._interface.ipv4_address
253
254
255    @property
256    def wifi_ip_subnet(self):
257        """@return string IPv4 subnet prefix of self.wifi_if."""
258        return self._interface.ipv4_subnet
259
260
261    @property
262    def wifi_phy_name(self):
263        """@return wiphy name (e.g., 'phy0') or None"""
264        return self._interface.wiphy_name
265
266    @property
267    def wifi_signal_level(self):
268        """Returns the signal level of this DUT's WiFi interface.
269
270        @return int signal level of connected WiFi interface or None (e.g. -67).
271
272        """
273        return self._interface.signal_level
274
275    @property
276    def wifi_signal_level_all_chains(self):
277        """Returns the signal level of all chains of this DUT's WiFi interface.
278
279        @return int array signal level of each chain of connected WiFi interface
280                or None (e.g. [-67, -60]).
281
282        """
283        return self._interface.signal_level_all_chains
284
285    @staticmethod
286    def assert_bsses_include_ssids(found_bsses, expected_ssids):
287        """Verifies that |found_bsses| includes |expected_ssids|.
288
289        @param found_bsses list of IwBss objects.
290        @param expected_ssids list of string SSIDs.
291        @raise error.TestFail if any element of |expected_ssids| is not found.
292
293        """
294        for ssid in expected_ssids:
295            if not ssid:
296                continue
297
298            for bss in found_bsses:
299                if bss.ssid == ssid:
300                    break
301            else:
302                raise error.TestFail('SSID %s is not in scan results: %r' %
303                                     (ssid, found_bsses))
304
305
306    def wifi_noise_level(self, frequency_mhz):
307        """Returns the noise level of this DUT's WiFi interface.
308
309        @param frequency_mhz: frequency at which the noise level should be
310               measured and reported.
311        @return int signal level of connected WiFi interface in dBm (e.g. -67)
312                or None if the value is unavailable.
313
314        """
315        return self._interface.noise_level(frequency_mhz)
316
317
318    def __init__(self, client_host, result_dir, use_wpa_cli):
319        """
320        Construct a WiFiClient.
321
322        @param client_host host object representing a remote host.
323        @param result_dir string directory to store test logs/packet caps.
324        @param use_wpa_cli bool True if we want to use |wpa_cli| commands for
325               Android testing.
326
327        """
328        super(WiFiClient, self).__init__(client_host, 'client',
329                                         inherit_interfaces=True)
330        self._command_ip = 'ip'
331        self._command_iptables = 'iptables -w 5'
332        self._command_ping6 = 'ping6'
333        self._command_wpa_cli = 'wpa_cli'
334        self._machine_id = None
335        self._result_dir = result_dir
336        self._conductive = None
337
338        if _is_eureka_host(self.host) and use_wpa_cli:
339            # Look up the WiFi device (and its MAC) on the client.
340            devs = self.iw_runner.list_interfaces(desired_if_type='managed')
341            devs = [dev for dev in devs
342                    if dev.if_name not in self.WIFI_IF_BLOCKLIST]
343            if not devs:
344                raise error.TestFail('No wlan devices found on %s.' %
345                                     self.host.hostname)
346
347            if len(devs) > 1:
348                logging.warning('Warning, found multiple WiFi devices on '
349                                '%s: %r', self.host.hostname, devs)
350            self._wifi_if = devs[0].if_name
351            self._shill_proxy = wpa_cli_proxy.WpaCliProxy(
352                    self.host, self._wifi_if)
353            self._wpa_cli_proxy = self._shill_proxy
354        else:
355            self._shill_proxy = get_xmlrpc_proxy(self.host)
356            interfaces = self._shill_proxy.list_controlled_wifi_interfaces()
357            if not interfaces:
358                logging.debug('No interfaces managed by shill. Rebooting host')
359                self.host.reboot()
360                raise error.TestError('No interfaces managed by shill on %s' %
361                                      self.host.hostname)
362            self._wifi_if = interfaces[0]
363            self._wpa_cli_proxy = wpa_cli_proxy.WpaCliProxy(
364                    self.host, self._wifi_if)
365            self._raise_logging_level()
366        self._interface = interface.Interface(self._wifi_if, host=self.host)
367        self._wpa_mon = wpa_mon.WpaMon(self.host, self.wifi_if)
368        logging.debug('WiFi interface is: %r',
369                      self._interface.device_description)
370        # All tests that use this object assume the interface starts enabled.
371        self.set_device_enabled(self._wifi_if, True)
372        # Turn off powersave mode by default.
373        self.powersave_switch(False)
374        # Invoke the |capabilities| property defined in the parent |Linuxsystem|
375        # to workaround the lazy loading of the capabilities cache and supported
376        # frequency list. This is needed for tests that may need access to these
377        # when the DUT is unreachable (for ex: suspended).
378        #pylint: disable=pointless-statement
379        self.capabilities
380
381
382    def _assert_method_supported(self, method_name):
383        """Raise a TestNAError if the XMLRPC proxy has no method |method_name|.
384
385        @param method_name: string name of method that should exist on the
386                XMLRPC proxy.
387
388        """
389        if not self._supports_method(method_name):
390            raise error.TestNAError('%s() is not supported' % method_name)
391
392
393    def _raise_logging_level(self):
394        """Raises logging levels for WiFi on DUT."""
395        self.host.run('ff_debug --level -2', ignore_status=True)
396        self.host.run('ff_debug +wifi', ignore_status=True)
397
398
399    def is_vht_supported(self):
400        """Returns True if VHT supported; False otherwise"""
401        return self.CAPABILITY_VHT in self.capabilities
402
403
404    def is_5ghz_supported(self):
405        """Returns True if 5Ghz bands are supported; False otherwise."""
406        return self.CAPABILITY_5GHZ in self.capabilities
407
408
409    def is_ibss_supported(self):
410        """Returns True if IBSS mode is supported; False otherwise."""
411        return self.CAPABILITY_IBSS in self.capabilities
412
413
414    def is_frequency_supported(self, frequency):
415        """Returns True if the given frequency is supported; False otherwise.
416
417        @param frequency: int Wifi frequency to check if it is supported by
418                          DUT.
419        """
420        return frequency in self.phys_for_frequency
421
422
423    def _supports_method(self, method_name):
424        """Checks if |method_name| is supported on the remote XMLRPC proxy.
425
426        autotest will, for their own reasons, install python files in the
427        autotest client package that correspond the version of the build
428        rather than the version running on the autotest drone.  This
429        creates situations where we call methods on the client XMLRPC proxy
430        that don't exist in that version of the code.  This detects those
431        situations so that we can degrade more or less gracefully.
432
433        @param method_name: string name of method that should exist on the
434                XMLRPC proxy.
435        @return True if method is available, False otherwise.
436
437        """
438        supported = (_is_eureka_host(self.host)
439                     or method_name in self._shill_proxy.system.listMethods())
440        if not supported:
441            logging.warning('%s() is not supported on older images',
442                            method_name)
443        return supported
444
445
446    def close(self):
447        """Tear down state associated with the client."""
448        self.stop_capture()
449        self.powersave_switch(False)
450        self.shill.clean_profiles()
451        super(WiFiClient, self).close()
452
453    def sync_host_times(self):
454        """Set time on our DUT to match local time."""
455        epoch_seconds = time.time()
456        self.shill.sync_time_to(epoch_seconds)
457
458
459    def collect_debug_info(self, local_save_dir_prefix):
460        """Collect any debug information needed from the DUT
461
462        This invokes the |collect_debug_info| RPC method to trigger
463        bugreport/logcat collection and then transfers the logs to the
464        server.
465
466        @param local_save_dir_prefix Used as a prefix for local save directory.
467        """
468        pass
469
470
471    def check_iw_link_value(self, iw_link_key, desired_value):
472        """Assert that the current wireless link property is |desired_value|.
473
474        @param iw_link_key string one of IW_LINK_KEY_* defined in iw_runner.
475        @param desired_value string desired value of iw link property.
476
477        """
478        actual_value = self.get_iw_link_value(iw_link_key)
479        desired_value = str(desired_value)
480        if actual_value != desired_value:
481            raise error.TestFail('Wanted iw link property %s value %s, but '
482                                 'got %s instead.' % (iw_link_key,
483                                                      desired_value,
484                                                      actual_value))
485
486
487    def get_iw_link_value(self, iw_link_key):
488        """Get the current value of a link property for this WiFi interface.
489
490        @param iw_link_key string one of IW_LINK_KEY_* defined in iw_runner.
491
492        """
493        return self.iw_runner.get_link_value(self.wifi_if, iw_link_key)
494
495
496    def powersave_switch(self, turn_on):
497        """Toggle powersave mode for the DUT.
498
499        @param turn_on bool True iff powersave mode should be turned on.
500
501        """
502        mode = 'off'
503        if turn_on:
504            mode = 'on'
505        # Turn ON interface and set power_save option.
506        self.host.run('ifconfig %s up' % self.wifi_if)
507        self.host.run('iw dev %s set power_save %s' % (self.wifi_if, mode))
508
509
510    def timed_scan(self, frequencies, ssids, scan_timeout_seconds=10,
511                   retry_timeout_seconds=10):
512        """Request timed scan to discover given SSIDs.
513
514        This method will retry for a default of |retry_timeout_seconds| until it
515        is able to successfully kick off a scan.  Sometimes, the device on the
516        DUT claims to be busy and rejects our requests. It will raise error
517        if the scan did not complete within |scan_timeout_seconds| or it was
518        not able to discover the given SSIDs.
519
520        @param frequencies list of int WiFi frequencies to scan for.
521        @param ssids list of string ssids to probe request for.
522        @param scan_timeout_seconds: float number of seconds the scan
523                operation not to exceed.
524        @param retry_timeout_seconds: float number of seconds to retry scanning
525                if the interface is busy.  This does not retry if certain
526                SSIDs are missing from the results.
527        @return time in seconds took to complete scan request.
528
529        """
530        # the poll method returns the result of the func
531        scan_result = utils.poll_for_condition(
532                condition=lambda: self.iw_runner.timed_scan(
533                                          self.wifi_if,
534                                          frequencies=frequencies,
535                                          ssids=ssids),
536                exception=error.TestFail('Unable to trigger scan on client'),
537                timeout=retry_timeout_seconds,
538                sleep_interval=0.5)
539        # Verify scan operation completed within given timeout
540        if scan_result.time > scan_timeout_seconds:
541            raise error.TestFail('Scan time %.2fs exceeds the scan timeout' %
542                                 (scan_result.time))
543
544        # Verify all ssids are discovered
545        self.assert_bsses_include_ssids(scan_result.bss_list, ssids)
546
547        logging.info('Wifi scan completed in %.2f seconds', scan_result.time)
548        return scan_result.time
549
550
551    def scan(self, frequencies, ssids, timeout_seconds=10, require_match=True):
552        """Request a scan and (optionally) check that requested SSIDs appear in
553        the results.
554
555        This method will retry for a default of |timeout_seconds| until it is
556        able to successfully kick off a scan.  Sometimes, the device on the DUT
557        claims to be busy and rejects our requests.
558
559        If |ssids| is non-empty, we will speficially probe for those SSIDs.
560
561        If |require_match| is True, we will verify that every element
562        of |ssids| was found in scan results.
563
564        @param frequencies list of int WiFi frequencies to scan for.
565        @param ssids list of string ssids to probe request for.
566        @param timeout_seconds: float number of seconds to retry scanning
567                if the interface is busy.  This does not retry if certain
568                SSIDs are missing from the results.
569        @param require_match: bool True if we must find |ssids|.
570
571        """
572        bss_list = utils.poll_for_condition(
573                condition=lambda: self.iw_runner.scan(
574                        self.wifi_if,
575                        frequencies=frequencies,
576                        ssids=ssids),
577                exception=error.TestFail('Unable to trigger scan on client'),
578                timeout=timeout_seconds,
579                sleep_interval=0.5)
580
581        if require_match:
582            self.assert_bsses_include_ssids(bss_list, ssids)
583
584
585    def wait_for_bss(self, bssid, timeout_seconds=15):
586        """Wait for a specific BSS to appear in the scan results.
587
588        @param bssid: string bssid of AP we expect to see in scan results
589        @param timeout_seconds int seconds to wait for BSSes to be discovered
590
591        """
592        def dut_sees_bss():
593            """Check if a DUT can see a BSS in scan results.
594
595            @return True iff scan results from DUT include the specified BSS.
596
597            """
598            is_requested_bss = lambda iw_bss: iw_bss.bss == bssid
599            scan_results = self.iw_runner.scan(self.wifi_if)
600            return scan_results and list(filter(is_requested_bss, scan_results))
601        try:
602            utils.poll_for_condition(
603                condition=dut_sees_bss,
604                timeout=timeout_seconds,
605                sleep_interval=0.5)
606        except:
607            raise error.TestFail('Failed to discover BSS %s' % bssid)
608
609
610    def wait_for_bsses(self, ssid, num_bss_expected, timeout_seconds=15):
611        """Wait for all BSSes associated with given SSID to be discovered in the
612      scan.
613
614      @param ssid string name of network being queried
615      @param num_bss_expected int number of BSSes expected
616      @param timeout_seconds int seconds to wait for BSSes to be discovered
617
618      """
619        # If the scan returns None, return 0, else return the matching count
620
621        # Wrap num_bss_actual as a mutable object, list, so that an inner function
622        # can update the value without making an assignment to it. Without any
623        # assignment, the inner function will look for the variable in outer scope
624        # instead of creating a new local one.
625        num_bss_actual = [0]
626
627        def are_all_bsses_discovered():
628            """Determine if all BSSes associated with the SSID from parent
629          function are discovered in the scan
630
631          @return boolean representing whether the expected bss count matches
632          how many in the scan match the given ssid
633          """
634            self.claim_wifi_if()  # Stop shill/supplicant scans
635            try:
636                scan_results = self.iw_runner.scan(self.wifi_if,
637                                                   frequencies=[],
638                                                   ssids=[ssid])
639                if scan_results is None:
640                    return False
641                num_bss_actual[0] = sum(ssid == bss.ssid
642                                        for bss in scan_results)
643                return num_bss_expected == num_bss_actual[0]
644            finally:
645                self.release_wifi_if()
646
647        try:
648            utils.poll_for_condition(condition=are_all_bsses_discovered,
649                                     timeout=timeout_seconds,
650                                     sleep_interval=0.5)
651        except utils.TimeoutError:
652            raise error.TestFail('Failed to discover all BSSes. Found %d,'
653                                 ' wanted %d with SSID %s' %
654                                 (num_bss_actual[0], num_bss_expected, ssid))
655
656    def wait_for_service_states(self, ssid, states, timeout_seconds):
657        """Waits for a WiFi service to achieve one of |states|.
658
659        @param ssid string name of network being queried
660        @param states tuple list of states for which the caller is waiting
661        @param timeout_seconds int seconds to wait for a state in |states|
662
663        """
664        logging.info('Waiting for %s to reach one of %r...', ssid, states)
665        success, state, duration  = self._shill_proxy.wait_for_service_states(
666                ssid, states, timeout_seconds)
667        logging.info('...ended up in state \'%s\' (%s) after %f seconds.',
668                     state, 'success' if success else 'failure', duration)
669        return success, state, duration
670
671
672    def do_suspend(self, seconds):
673        """Puts the DUT in suspend power state for |seconds| seconds.
674
675        @param seconds: The number of seconds to suspend the device.
676
677        """
678        logging.info('Suspending DUT for %d seconds...', seconds)
679        self._shill_proxy.do_suspend(seconds)
680        logging.info('...done suspending')
681
682
683    def do_suspend_bg(self, seconds):
684        """Suspend DUT using the power manager - non-blocking.
685
686        @param seconds: The number of seconds to suspend the device.
687
688        """
689        logging.info('Suspending DUT (in background) for %d seconds...',
690                     seconds)
691        self._shill_proxy.do_suspend_bg(seconds)
692
693
694    def flush_bss(self, age=0):
695        """Flush supplicant's cached BSS on the DUT.
696
697        @param age: BSS older than |age| seconds will be removed from the cache.
698        """
699        result = self._wpa_cli_proxy.run_wpa_cli_cmd('bss_flush %d' % age,
700                                                     check_result=False);
701        logging.info('wpa_cli bss_flush %d: out:%r err:%r', age, result.stdout,
702                     result.stderr)
703        return result.stdout, result.stderr
704
705
706    def clear_supplicant_blocklist(self):
707        """Clear's the AP blocklist on the DUT.
708
709        @return stdout and stderror returns passed from wpa_cli command.
710
711        """
712        result = self._wpa_cli_proxy.run_wpa_cli_cmd('bssid_ignore clear',
713                                                     check_result=False)
714        logging.info('wpa_cli bssid_ignore clear: out:%r err:%r',
715                     result.stdout, result.stderr)
716        return result.stdout, result.stderr
717
718
719    def get_active_wifi_SSIDs(self):
720        """Get a list of visible SSID's around the DUT
721
722        @return list of string SSIDs
723
724        """
725        self._assert_method_supported('get_active_wifi_SSIDs')
726        return self._shill_proxy.get_active_wifi_SSIDs()
727
728
729    def set_device_enabled(self, wifi_interface, value,
730                           fail_on_unsupported=False):
731        """Enable or disable the WiFi device.
732
733        @param wifi_interface: string name of interface being modified.
734        @param enabled: boolean; true if this device should be enabled,
735                false if this device should be disabled.
736        @return True if it worked; False, otherwise.
737
738        """
739        if fail_on_unsupported:
740            self._assert_method_supported('set_device_enabled')
741        elif not self._supports_method('set_device_enabled'):
742            return False
743        return self._shill_proxy.set_device_enabled(wifi_interface, value)
744
745
746    def add_arp_entry(self, ip_address, mac_address):
747        """Add an ARP entry to the table associated with the WiFi interface.
748
749        @param ip_address: string IP address associated with the new ARP entry.
750        @param mac_address: string MAC address associated with the new ARP
751                entry.
752
753        """
754        self.host.run('ip neigh add %s lladdr %s dev %s nud perm' %
755                      (ip_address, mac_address, self.wifi_if))
756
757    def add_wake_packet_source(self, source_ip):
758        """Add |source_ip| as a source that can wake us up with packets.
759
760        @param source_ip: IP address from which to wake upon receipt of packets
761
762        @return True if successful, False otherwise.
763
764        """
765        return self._shill_proxy.add_wake_packet_source(
766            self.wifi_if, source_ip)
767
768
769    def remove_wake_packet_source(self, source_ip):
770        """Remove |source_ip| as a source that can wake us up with packets.
771
772        @param source_ip: IP address to stop waking on packets from
773
774        @return True if successful, False otherwise.
775
776        """
777        return self._shill_proxy.remove_wake_packet_source(
778            self.wifi_if, source_ip)
779
780
781    def remove_all_wake_packet_sources(self):
782        """Remove all IPs as sources that can wake us up with packets.
783
784        @return True if successful, False otherwise.
785
786        """
787        return self._shill_proxy.remove_all_wake_packet_sources(self.wifi_if)
788
789
790    def wake_on_wifi_features(self, features):
791        """Shill supports programming the NIC to wake on special kinds of
792        incoming packets, or on changes to the available APs (disconnect,
793        coming in range of a known SSID). This method allows you to configure
794        what wake-on-WiFi mechanisms are active. It returns a context manager,
795        because this is a system-wide setting and we don't want it to persist
796        across different tests.
797
798        If you enable wake-on-packet, then the IPs registered by
799        add_wake_packet_source will be able to wake the system from suspend.
800
801        The correct way to use this method is:
802
803        with client.wake_on_wifi_features(WAKE_ON_WIFI_DARKCONNECT):
804            ...
805
806        @param features: string from the WAKE_ON_WIFI constants above.
807
808        @return a context manager for the features.
809
810        """
811        return TemporaryDeviceDBusProperty(self._shill_proxy,
812                                           self.wifi_if,
813                                           self.WAKE_ON_WIFI_FEATURES,
814                                           features)
815
816
817    def net_detect_scan_period_seconds(self, period):
818        """Sets the period between net detect scans performed by the NIC to look
819        for allowlisted SSIDs to |period|. This setting only takes effect if the
820        NIC is programmed to wake on SSID.
821
822        The correct way to use this method is:
823
824        with client.net_detect_scan_period_seconds(60):
825            ...
826
827        @param period: integer number of seconds between NIC net detect scans
828
829        @return a context manager for the net detect scan period
830
831        """
832        return TemporaryDeviceDBusProperty(self._shill_proxy,
833                                           self.wifi_if,
834                                           self.NET_DETECT_SCAN_PERIOD,
835                                           period)
836
837
838    def wake_to_scan_period_seconds(self, period):
839        """Sets the period between RTC timer wakeups where the system is woken
840        from suspend to perform scans. This setting only takes effect if the
841        NIC is programmed to wake on SSID. Use as with
842        net_detect_scan_period_seconds.
843
844        @param period: integer number of seconds between wake to scan RTC timer
845                wakes.
846
847        @return a context manager for the net detect scan period
848
849        """
850        return TemporaryDeviceDBusProperty(self._shill_proxy,
851                                           self.wifi_if,
852                                           self.WAKE_TO_SCAN_PERIOD,
853                                           period)
854
855
856    def force_wake_to_scan_timer(self, is_forced):
857        """Sets the boolean value determining whether or not to force the use of
858        the wake to scan RTC timer, which wakes the system from suspend
859        periodically to scan for networks.
860
861        @param is_forced: boolean whether or not to force the use of the wake to
862                scan timer
863
864        @return a context manager for the net detect scan period
865
866        """
867        return TemporaryDeviceDBusProperty(self._shill_proxy,
868                                           self.wifi_if,
869                                           self.FORCE_WAKE_TO_SCAN_TIMER,
870                                           is_forced)
871
872
873    def mac_address_randomization(self, enabled):
874        """Sets the boolean value determining whether or not to enable MAC
875        address randomization. This instructs the NIC to randomize the last
876        three octets of the MAC address used in probe requests while
877        disconnected to make the DUT harder to track.
878
879        If MAC address randomization is not supported on this DUT and the
880        caller tries to turn it on, this raises a TestNAError.
881
882        @param enabled: boolean whether or not to enable MAC address
883                randomization
884
885        @return a context manager for the MAC address randomization property
886
887        """
888        if not self._shill_proxy.get_dbus_property_on_device(
889                self.wifi_if, self.MAC_ADDRESS_RANDOMIZATION_SUPPORTED):
890            if enabled:
891                raise error.TestNAError(
892                    'MAC address randomization not supported')
893            else:
894                # Return a no-op context manager.
895                return contextlib.nested()
896
897        return TemporaryDeviceDBusProperty(
898                self._shill_proxy,
899                self.wifi_if,
900                self.MAC_ADDRESS_RANDOMIZATION_ENABLED,
901                enabled)
902
903
904    def request_roam(self, bssid):
905        """Request that we roam to the specified BSSID.
906
907        Note that this operation assumes that:
908
909        1) We're connected to an SSID for which |bssid| is a member.
910        2) There is a BSS with an appropriate ID in our scan results.
911
912        This method does not check for success of either the command or
913        the roaming operation.
914
915        @param bssid: string MAC address of bss to roam to.
916
917        """
918        self._wpa_cli_proxy.run_wpa_cli_cmd('roam %s' % bssid,
919                                            check_result=False);
920        return True
921
922
923    def request_roam_dbus(self, bssid, iface):
924        """Request that we roam to the specified BSSID through dbus.
925
926        Note that this operation assumes that:
927
928        1) We're connected to an SSID for which |bssid| is a member.
929        2) There is a BSS with an appropriate ID in our scan results.
930
931        @param bssid: string MAC address of bss to roam to.
932        @param iface: interface to use
933        @return True if the roam was initiated successfully. Note that this
934                does not guarantee the roam completed successfully.
935
936        """
937        self._assert_method_supported('request_roam_dbus')
938        return self._shill_proxy.request_roam_dbus(bssid, iface)
939
940
941    def wait_for_roam(self, bssid, timeout_seconds=10.0):
942        """Wait for a roam to the given |bssid|.
943
944        @param bssid: string bssid to expect a roam to
945                (e.g.  '00:11:22:33:44:55').
946        @param timeout_seconds: float number of seconds to wait for a roam.
947        @return True if we detect an association to the given |bssid| within
948                |timeout_seconds|.
949
950        """
951        def attempt_roam():
952            """Perform a roam to the given |bssid|
953
954            @return True if there is an assocation between the given |bssid|
955                    and that association is detected within the timeout frame
956            """
957            current_bssid = self.iw_runner.get_current_bssid(self.wifi_if)
958            logging.debug('Current BSSID is %s.', current_bssid)
959            return current_bssid == bssid
960
961        start_time = time.time()
962        duration = 0.0
963        try:
964            success = utils.poll_for_condition(
965                    condition=attempt_roam,
966                    timeout=timeout_seconds,
967                    sleep_interval=0.5,
968                    desc='Wait for a roam to the given bssid')
969        # wait_for_roam should return False on timeout
970        except utils.TimeoutError:
971            success = False
972
973        duration = time.time() - start_time
974        logging.debug('%s to %s in %f seconds.',
975                      'Roamed ' if success else 'Failed to roam ',
976                      bssid,
977                      duration)
978        return success
979
980
981    def wait_for_ssid_vanish(self, ssid):
982        """Wait for shill to notice that there are no BSS's for an SSID present.
983
984        Raise a test failing exception if this does not come to pass.
985
986        @param ssid: string SSID of the network to require be missing.
987
988        """
989        def is_missing_yet():
990            """Determine if the ssid is removed from the service list yet
991
992            @return True if the ssid is not found in the service list
993            """
994            visible_ssids = self.get_active_wifi_SSIDs()
995            logging.info('Got service list: %r', visible_ssids)
996            if ssid not in visible_ssids:
997                return True
998            self.scan(frequencies=[], ssids=[], timeout_seconds=30)
999
1000        utils.poll_for_condition(
1001                condition=is_missing_yet, # func
1002                exception=error.TestFail('shill should mark BSS not present'),
1003                timeout=self.MAX_SERVICE_GONE_TIMEOUT_SECONDS,
1004                sleep_interval=0)
1005
1006    def reassociate(self, timeout_seconds=10):
1007        """Reassociate to the connected network.
1008
1009        @param timeout_seconds: float number of seconds to wait for operation
1010                to complete.
1011
1012        """
1013        logging.info('Attempt to reassociate')
1014        with self.iw_runner.get_event_logger() as logger:
1015            logger.start()
1016            # Issue reattach command to wpa_supplicant
1017            self._wpa_cli_proxy.run_wpa_cli_cmd('reattach');
1018
1019            # Wait for the timeout seconds for association to complete
1020            time.sleep(timeout_seconds)
1021
1022            # Stop iw event logger
1023            logger.stop()
1024
1025            # Get association time based on the iw event log
1026            reassociate_time = logger.get_reassociation_time()
1027            if reassociate_time is None or reassociate_time > timeout_seconds:
1028                raise error.TestFail(
1029                        'Failed to reassociate within given timeout')
1030            logging.info('Reassociate time: %.2f seconds', reassociate_time)
1031
1032
1033    def wait_for_connection(self,
1034                            ssid,
1035                            timeout_seconds=30,
1036                            freq=None,
1037                            ping_ip=None,
1038                            desired_subnet=None,
1039                            source_iface=None):
1040        """Verifies a connection to network ssid, optionally verifying
1041        frequency, ping connectivity and subnet.
1042
1043        @param ssid string ssid of the network to check.
1044        @param timeout_seconds int number of seconds to wait for
1045                connection on the given frequency.
1046        @param freq int frequency of network to check.
1047        @param ping_ip string ip address to ping for verification.
1048        @param desired_subnet string expected subnet in which client
1049                ip address should reside.
1050
1051        @returns a named tuple of (state, time)
1052        """
1053        start_time = time.time()
1054        duration = lambda: time.time() - start_time
1055
1056        # Wrap state as a mutable object, list, so that an inner function can
1057        # update the value without making an assignment to it. Without any
1058        # assignment, the inner function will look for the variable in outer
1059        # scope instead of creating a new local one.
1060        state = [None]
1061        def verify_connection():
1062            """Verify the connection and perform optional operations
1063            as defined in the parent function
1064
1065            @return False if there is a failure in the connection or
1066                    the prescribed verification steps
1067                    The named tuple ConnectTime otherwise, with the state
1068                    and connection time from waiting for service states
1069            """
1070            success, state[0], conn_time = self.wait_for_service_states(
1071                    ssid,
1072                    self.CONNECTED_STATES,
1073                    int(math.ceil(timeout_seconds - duration())))
1074            if not success:
1075                return False
1076
1077            if freq:
1078                actual_freq = self.get_iw_link_value(
1079                        iw_runner.IW_LINK_KEY_FREQUENCY)
1080                if str(freq) != actual_freq:
1081                    logging.debug(
1082                            'Waiting for desired frequency %s (got %s).',
1083                            freq,
1084                            actual_freq)
1085                    return False
1086
1087            if desired_subnet:
1088                actual_subnet = self.wifi_ip_subnet
1089                if actual_subnet != desired_subnet:
1090                    logging.debug(
1091                            'Waiting for desired subnet %s (got %s).',
1092                            desired_subnet,
1093                            actual_subnet)
1094                    return False
1095
1096            if ping_ip:
1097                ping_config = ping_runner.PingConfig(ping_ip,
1098                                                     source_iface=source_iface)
1099                self.ping(ping_config)
1100
1101            return ConnectTime(state[0], conn_time)
1102
1103        freq_error_str = (' on frequency %d Mhz' % freq) if freq else ''
1104
1105        try:
1106            ret = utils.poll_for_condition(
1107                condition=verify_connection,
1108                timeout=timeout_seconds,
1109                sleep_interval=0)
1110        except utils.TimeoutError:
1111            raise error.TestFail(
1112                'Failed to connect to "%s"%s in %f seconds (state=%s)' %
1113                (ssid,
1114                 freq_error_str,
1115                 duration(),
1116                 state[0]))
1117        return ret
1118
1119    @contextmanager
1120    def assert_disconnect_count(self, count):
1121        """Context asserting |count| disconnects for the context lifetime.
1122
1123        Creates an iw logger during the lifetime of the context and asserts
1124        that the client disconnects exactly |count| times.
1125
1126        @param count int the expected number of disconnections.
1127
1128        """
1129        with self.iw_runner.get_event_logger() as logger:
1130            logger.start()
1131            yield
1132            logger.stop()
1133            if logger.get_disconnect_count() != count:
1134                raise error.TestFail(
1135                    'Client disconnected %d times; expected %d' %
1136                    (logger.get_disconnect_count(), count))
1137
1138
1139    def assert_no_disconnects(self):
1140        """Context asserting no disconnects for the context lifetime."""
1141        return self.assert_disconnect_count(0)
1142
1143
1144    @contextmanager
1145    def assert_disconnect_event(self):
1146        """Context asserting at least one disconnect for the context lifetime.
1147
1148        Creates an iw logger during the lifetime of the context and asserts
1149        that the client disconnects at least one time.
1150
1151        """
1152        with self.iw_runner.get_event_logger() as logger:
1153            logger.start()
1154            yield
1155            logger.stop()
1156            if logger.get_disconnect_count() == 0:
1157                raise error.TestFail('Client did not disconnect')
1158
1159
1160    def get_num_card_resets(self):
1161        """Get card reset count."""
1162        reset_msg = '[m]wifiex_sdio_card_reset'
1163        result = self.host.run('grep -c %s /var/log/messages' % reset_msg,
1164                               ignore_status=True)
1165        if result.exit_status == 1:
1166            return 0
1167        count = int(result.stdout.strip())
1168        return count
1169
1170
1171    def get_disconnect_reasons(self):
1172        """Get disconnect reason codes."""
1173        disconnect_reason_msg = "updated DisconnectReason "
1174        disconnect_reason_cleared = "clearing DisconnectReason for "
1175        result = self.host.run('grep -a -E "(%s|%s)" /var/log/net.log' %
1176                               (disconnect_reason_msg,
1177                               disconnect_reason_cleared),
1178                               ignore_status=True)
1179        if result.exit_status == 1:
1180            return None
1181
1182        lines = result.stdout.strip().split('\n')
1183        disconnect_reasons = []
1184        disconnect_reason_regex = re.compile(r' to (\D?\d+)')
1185
1186        found = False
1187        for line in reversed(lines):
1188            match = disconnect_reason_regex.search(line)
1189            if match is not None:
1190                disconnect_reasons.append(match.group(1))
1191                found = True
1192            else:
1193                if (found):
1194                    break
1195        return list(reversed(disconnect_reasons))
1196
1197
1198    def release_wifi_if(self):
1199        """Release the control over the wifi interface back to normal operation.
1200
1201        This will give the ownership of the wifi interface back to shill and
1202        wpa_supplicant.
1203
1204        """
1205        self.set_device_enabled(self._wifi_if, True)
1206
1207
1208    def claim_wifi_if(self):
1209        """Claim the control over the wifi interface from this wifi client.
1210
1211        This claim the ownership of the wifi interface from shill and
1212        wpa_supplicant. The wifi interface should be UP when this call returns.
1213
1214        """
1215        # Disabling a wifi device in shill will remove that device from
1216        # wpa_supplicant as well.
1217        self.set_device_enabled(self._wifi_if, False)
1218
1219        # Wait for shill to bring down the wifi interface.
1220        is_interface_down = lambda: not self._interface.is_up
1221        utils.poll_for_condition(
1222                is_interface_down,
1223                timeout=INTERFACE_DOWN_WAIT_TIME_SECONDS,
1224                sleep_interval=0.5,
1225                desc='Timeout waiting for interface to go down.')
1226        # Bring up the wifi interface to allow the test to use the interface.
1227        self.host.run('%s link set %s up' % (self.cmd_ip, self.wifi_if))
1228
1229
1230    def set_sched_scan(self, enable, fail_on_unsupported=False):
1231        """enable/disable scheduled scan.
1232
1233        @param enable bool flag indicating to enable/disable scheduled scan.
1234
1235        """
1236        if fail_on_unsupported:
1237            self._assert_method_supported('set_sched_scan')
1238        elif not self._supports_method('set_sched_scan'):
1239            return False
1240        return self._shill_proxy.set_sched_scan(enable)
1241
1242
1243    def check_connected_on_last_resume(self):
1244        """Checks whether the DUT was connected on its last resume.
1245
1246        Checks that the DUT was connected after waking from suspend by parsing
1247        the last instance shill log message that reports shill's connection
1248        status on resume. Fails the test if this log message reports that
1249        the DUT woke up disconnected.
1250
1251        """
1252        # As of build R43 6913.0.0, the shill log message from the function
1253        # OnAfterResume is called as soon as shill resumes from suspend, and
1254        # will report whether or not shill is connected. The log message will
1255        # take one of the following two forms:
1256        #
1257        #       [...] [INFO:wifi.cc(1941)] OnAfterResume: connected
1258        #       [...] [INFO:wifi.cc(1941)] OnAfterResume: not connected
1259        #
1260        # where 1941 is an arbitrary PID number. By checking if the last
1261        # instance of this message contains the substring "not connected", we
1262        # can determine whether or not shill was connected on its last resume.
1263        connection_status_log_regex_str = str(r'INFO:wifi\.cc.*OnAfterResume')
1264        not_connected_substr = str(r'not connected')
1265        connected_substr = str(r'connected')
1266
1267        cmd = ('grep -E %s /var/log/net.log | tail -1' %
1268               connection_status_log_regex_str)
1269        connection_status_log = self.host.run(cmd).stdout
1270        if not connection_status_log:
1271            raise error.TestFail('Could not find resume connection status log '
1272                                 'message.')
1273
1274        logging.debug('Connection status message:\n%s', connection_status_log)
1275        if not_connected_substr in connection_status_log:
1276            raise error.TestFail('Client was not connected upon waking from '
1277                                 'suspend.')
1278
1279        if not connected_substr in connection_status_log:
1280            raise error.TestFail('Last resume log message did not contain '
1281                                 'connection status.')
1282
1283        logging.info('Client was connected upon waking from suspend.')
1284
1285
1286    def check_wake_on_wifi_throttled(self):
1287        """
1288        Checks whether wake on WiFi was throttled on the DUT on the last dark
1289        resume. Check for this by parsing shill logs for a throttling message.
1290
1291        """
1292        # We are looking for an dark resume error log message indicating that
1293        # wake on WiFi was throttled. This is an example of the error message:
1294        #     [...] [ERROR:wake_on_wifi.cc(1304)] OnDarkResume: Too many dark \
1295        #       resumes; disabling wake on WiFi temporarily
1296        dark_resume_log_regex_str = str(r'ERROR:wake_on_wifi\.cc.*OnDarkResume:.*')
1297        throttled_msg_substr = ('Too many dark resumes; disabling wake on '
1298                                   'WiFi temporarily')
1299
1300        cmd = ('grep -E %s /var/log/net.log | tail -1' %
1301               dark_resume_log_regex_str)
1302        last_dark_resume_error_log = self.host.run(cmd).stdout
1303        if not last_dark_resume_error_log:
1304            raise error.TestFail('Could not find a dark resume log message.')
1305
1306        logging.debug('Last dark resume log message:\n%s',
1307                last_dark_resume_error_log)
1308        if not throttled_msg_substr in last_dark_resume_error_log:
1309            raise error.TestFail('Wake on WiFi was not throttled on the last '
1310                                 'dark resume.')
1311
1312        logging.info('Wake on WiFi was throttled on the last dark resume.')
1313
1314
1315    def shill_debug_log(self, message):
1316        """Logs a message to the shill log (i.e. /var/log/net.log). This
1317           message will be logged at the DEBUG level.
1318
1319        @param message: the message to be logged.
1320
1321        """
1322        logging.info(message)
1323        logger_command = ('/usr/bin/logger'
1324                          ' --tag shill'
1325                          ' --priority daemon.debug'
1326                          ' "%s"' % utils.sh_escape(message))
1327        self.host.run(logger_command)
1328
1329
1330    def is_wake_on_wifi_supported(self):
1331        """Returns true iff wake-on-WiFi is supported by the DUT."""
1332
1333        if (self.shill.get_dbus_property_on_device(
1334                    self.wifi_if, self.WAKE_ON_WIFI_FEATURES) ==
1335             WAKE_ON_WIFI_NOT_SUPPORTED):
1336            return False
1337        return True
1338
1339
1340    def set_manager_property(self, prop_name, prop_value):
1341        """Sets the given manager property to the value provided.
1342
1343        @param prop_name: the property to be set
1344        @param prop_value: value to assign to the prop_name
1345        @return a context manager for the setting
1346
1347        """
1348        return TemporaryManagerDBusProperty(self._shill_proxy,
1349                                            prop_name,
1350                                            prop_value)
1351
1352
1353class TemporaryDeviceDBusProperty:
1354    """Utility class to temporarily change a dbus property for the WiFi device.
1355
1356    Since dbus properties are global and persistent settings, we want
1357    to make sure that we change them back to what they were before the test
1358    started.
1359
1360    """
1361
1362    def __init__(self, shill_proxy, iface, prop_name, value):
1363        """Construct a TemporaryDeviceDBusProperty context manager.
1364
1365
1366        @param shill_proxy: the shill proxy to use to communicate via dbus
1367        @param iface: device whose property to change (e.g. 'wlan0')
1368        @param prop_name: the name of the property we want to set
1369        @param value: the desired value of the property
1370
1371        """
1372        self._shill = shill_proxy
1373        self._interface = iface
1374        self._prop_name = prop_name
1375        self._value = value
1376        self._saved_value = None
1377
1378
1379    def __enter__(self):
1380        logging.info('- Setting property %s on device %s',
1381                     self._prop_name,
1382                     self._interface)
1383
1384        self._saved_value = self._shill.get_dbus_property_on_device(
1385                self._interface, self._prop_name)
1386        if self._saved_value is None:
1387            raise error.TestFail('Device or property not found.')
1388        if not self._shill.set_dbus_property_on_device(self._interface,
1389                                                       self._prop_name,
1390                                                       self._value):
1391            raise error.TestFail('Could not set property')
1392
1393        logging.info('- Changed value from %s to %s',
1394                     self._saved_value,
1395                     self._value)
1396
1397
1398    def __exit__(self, exception, value, traceback):
1399        logging.info('- Resetting property %s', self._prop_name)
1400
1401        if not self._shill.set_dbus_property_on_device(self._interface,
1402                                                       self._prop_name,
1403                                                       self._saved_value):
1404            raise error.TestFail('Could not reset property')
1405
1406
1407class TemporaryManagerDBusProperty:
1408    """Utility class to temporarily change a Manager dbus property.
1409
1410    Since dbus properties are global and persistent settings, we want
1411    to make sure that we change them back to what they were before the test
1412    started.
1413
1414    """
1415
1416    def __init__(self, shill_proxy, prop_name, value):
1417        """Construct a TemporaryManagerDBusProperty context manager.
1418
1419        @param shill_proxy: the shill proxy to use to communicate via dbus
1420        @param prop_name: the name of the property we want to set
1421        @param value: the desired value of the property
1422
1423        """
1424        self._shill = shill_proxy
1425        self._prop_name = prop_name
1426        self._value = value
1427        self._saved_value = None
1428
1429
1430    def __enter__(self):
1431        logging.info('- Setting Manager property: %s', self._prop_name)
1432
1433        self._saved_value = self._shill.get_manager_property(
1434                self._prop_name)
1435        if self._saved_value is None:
1436            self._saved_value = ""
1437            if not self._shill.set_optional_manager_property(self._prop_name,
1438                                                             self._value):
1439                raise error.TestFail('Could not set optional manager property.')
1440        else:
1441            setprop_result = self._shill.set_manager_property(self._prop_name,
1442                                                              self._value)
1443            if not setprop_result:
1444                raise error.TestFail('Could not set manager property')
1445
1446        logging.info('- Changed value from [%s] to [%s]',
1447                     self._saved_value,
1448                     self._value)
1449
1450
1451    def __exit__(self, exception, value, traceback):
1452        logging.info('- Resetting property %s to [%s]',
1453                     self._prop_name,
1454                     self._saved_value)
1455
1456        if not self._shill.set_manager_property(self._prop_name,
1457                                                self._saved_value):
1458            raise error.TestFail('Could not reset manager property')
1459