• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Server side bluetooth adapter subtests."""
6
7import inspect
8import functools
9import logging
10import re
11import time
12
13from autotest_lib.client.bin import utils
14from autotest_lib.client.bin.input import input_event_recorder as recorder
15from autotest_lib.client.common_lib import error
16from autotest_lib.server import test
17from autotest_lib.client.bin.input.linux_input import (
18        BTN_LEFT, BTN_RIGHT, EV_KEY, EV_REL, REL_X, REL_Y, REL_WHEEL)
19
20
21REBOOTING_CHAMELEON = False
22
23Event = recorder.Event
24
25
26# Delay binding the methods since host is only available at run time.
27SUPPORTED_DEVICE_TYPES = {
28    'MOUSE': lambda host: host.chameleon.get_bluetooth_hid_mouse,
29    'LE_MOUSE': lambda host: host.chameleon.get_bluetooth_hog_mouse,
30    'BLE_MOUSE': lambda host: host.chameleon.get_ble_mouse,
31}
32
33
34def method_name():
35    """Get the method name of a class.
36
37    This function is supposed to be invoked inside a class and will
38    return current method name who invokes this function.
39
40    @returns: the string of the method name inside the class.
41    """
42    return inspect.getouterframes(inspect.currentframe())[1][3]
43
44
45def _run_method(method, method_name, *args, **kwargs):
46    """Run a target method and capture exceptions if any.
47
48    This is just a wrapper of the target method so that we do not need to
49    write the exception capturing structure repeatedly. The method could
50    be either a device method or a facade method.
51
52    @param method: the method to run
53    @param method_name: the name of the method
54
55    @returns: the return value of target method() if successful.
56              False otherwise.
57
58    """
59    result = False
60    try:
61        result = method(*args, **kwargs)
62    except Exception as e:
63        logging.error('%s: %s', method_name, e)
64    except:
65        logging.error('%s: unexpected error', method_name)
66    return result
67
68
69def get_bluetooth_emulated_device(host, device_type):
70    """Get the bluetooth emulated device object.
71
72    @param host: the DUT, usually a chromebook
73    @param device_type : the bluetooth HID device type, e.g., 'MOUSE'
74
75    @returns: the bluetooth device object
76
77    """
78
79    def _retry_device_method(method_name, legal_falsy_values=[]):
80        """retry the emulated device's method.
81
82        The method is invoked as device.xxxx() e.g., device.GetAdvertisedName().
83
84        Note that the method name string is provided to get the device's actual
85        method object at run time through getattr(). The rebinding is required
86        because a new device may have been created previously or during the
87        execution of fix_serial_device().
88
89        Given a device's method, it is not feasible to get the method name
90        through __name__ attribute. This limitation is due to the fact that
91        the device is a dotted object of an XML RPC server proxy.
92        As an example, with the method name 'GetAdvertisedName', we could
93        derive the correspoinding method device.GetAdvertisedName. On the
94        contrary, given device.GetAdvertisedName, it is not feasible to get the
95        method name by device.GetAdvertisedName.__name__
96
97        Also note that if the device method fails at the first time, we would
98        try to fix the problem by re-creating the serial device and see if the
99        problem is fixed. If not, we will reboot the chameleon board and see
100        if the problem is fixed. If yes, execute the target method the second
101        time.
102
103        The default values exist for uses of this function before the options
104        were added, ideally we should change zero_ok to False.
105
106        @param method_name: the string of the method name.
107        @param legal_falsy_values: Values that are falsy but might be OK.
108
109        @returns: the result returned by the device's method.
110
111        """
112        result = _run_method(getattr(device, method_name), method_name)
113        if _is_successful(result, legal_falsy_values):
114            return result
115
116        logging.error('%s failed the 1st time. Try to fix the serial device.',
117                      method_name)
118
119        # Try to fix the serial device if possible.
120        if not fix_serial_device(host, device):
121            return False
122
123        logging.info('%s: retry the 2nd time.', method_name)
124        return _run_method(getattr(device, method_name), method_name)
125
126
127    if device_type not in SUPPORTED_DEVICE_TYPES:
128        raise error.TestError('The device type is not supported: %s',
129                              device_type)
130
131    # Get the bluetooth device object and query some important properties.
132    device = SUPPORTED_DEVICE_TYPES[device_type](host)()
133
134    # Get some properties of the kit
135    # NOTE: Strings updated here must be kept in sync with Chameleon.
136    device._capabilities = _retry_device_method('GetCapabilities')
137    device._transports = device._capabilities["CAP_TRANSPORTS"]
138    device._is_le_only = ("TRANSPORT_LE" in device._transports and
139                          len(device._transports) == 1)
140    device._has_pin = device._capabilities["CAP_HAS_PIN"]
141    device.can_init_connection = device._capabilities["CAP_INIT_CONNECT"]
142
143    _retry_device_method('Init')
144    logging.info('device type: %s', device_type)
145
146    device.name = _retry_device_method('GetAdvertisedName')
147    logging.info('device name: %s', device.name)
148
149    device.address = _retry_device_method('GetLocalBluetoothAddress')
150    logging.info('address: %s', device.address)
151
152    pin_falsy_values = [] if device._has_pin else [None]
153    device.pin = _retry_device_method('GetPinCode', pin_falsy_values)
154    logging.info('pin: %s', device.pin)
155
156    class_falsy_values = [None] if device._is_le_only else [0]
157
158    # Class of service is None for LE-only devices. Don't fail or parse it.
159    device.class_of_service = _retry_device_method('GetClassOfService',
160                                                   class_falsy_values)
161    if device._is_le_only:
162      parsed_class_of_service = device.class_of_service
163    else:
164      parsed_class_of_service = "0x%04X" % device.class_of_service
165    logging.info('class of service: %s', parsed_class_of_service)
166
167    device.class_of_device = _retry_device_method('GetClassOfDevice',
168                                                  class_falsy_values)
169    # Class of device is None for LE-only devices. Don't fail or parse it.
170    if device._is_le_only:
171      parsed_class_of_device = device.class_of_device
172    else:
173      parsed_class_of_device = "0x%04X" % device.class_of_device
174    logging.info('class of device: %s', parsed_class_of_device)
175
176    device.device_type = _retry_device_method('GetHIDDeviceType')
177    logging.info('device type: %s', device.device_type)
178
179    device.authentication_mode = None
180    if not device._is_le_only:
181      device.authentication_mode = _retry_device_method('GetAuthenticationMode')
182      logging.info('authentication mode: %s', device.authentication_mode)
183
184    device.port = _retry_device_method('GetPort')
185    logging.info('serial port: %s\n', device.port)
186
187    return device
188
189
190def recreate_serial_device(device):
191    """Create and connect to a new serial device.
192
193    @param device: the bluetooth HID device
194
195    @returns: True if the serial device is re-created successfully.
196
197    """
198    logging.info('Remove the old serial device and create a new one.')
199    if device is not None:
200        try:
201            device.Close()
202        except:
203            logging.error('failed to close the serial device.')
204            return False
205    try:
206        device.CreateSerialDevice()
207        return True
208    except:
209        logging.error('failed to invoke CreateSerialDevice.')
210        return False
211
212
213def _reboot_chameleon(host, device):
214    REBOOT_SLEEP_SECS = 40
215
216    if not REBOOTING_CHAMELEON:
217        logging.info('Skip rebooting chameleon.')
218        return False
219
220    # Close the bluetooth peripheral device and reboot the chameleon board.
221    device.Close()
222    logging.info('rebooting chameleon...')
223    host.chameleon.reboot()
224
225    # Every chameleon reboot would take a bit more than REBOOT_SLEEP_SECS.
226    # Sleep REBOOT_SLEEP_SECS and then begin probing the chameleon board.
227    time.sleep(REBOOT_SLEEP_SECS)
228
229    # Check if the serial device could initialize, connect, and
230    # enter command mode correctly.
231    logging.info('Checking device status...')
232    if not _run_method(device.Init, 'Init'):
233        logging.info('device.Init: failed after reboot')
234        return False
235    if not device.CheckSerialConnection():
236        logging.info('device.CheckSerialConnection: failed after reboot')
237        return False
238    if not _run_method(device.EnterCommandMode, 'EnterCommandMode'):
239        logging.info('device.EnterCommandMode: failed after reboot')
240        return False
241    logging.info('The device is created successfully after reboot.')
242    return True
243
244
245def _is_successful(result, legal_falsy_values=[]):
246    """Is the method result considered successful?
247
248    Some method results, for example that of class_of_service, may be 0 which is
249    considered a valid result. Occassionally, None is acceptable.
250
251    The default values exist for uses of this function before the options were
252    added, ideally we should change zero_ok to False.
253
254    @param result: a method result
255    @param legal_falsy_values: Values that are falsy but might be OK.
256
257    @returns: True if bool(result) is True, or if result is 0 and zero_ok, or if
258              result is None and none_ok.
259    """
260    truthiness_of_result = bool(result)
261    return truthiness_of_result or result in legal_falsy_values
262
263
264def fix_serial_device(host, device):
265    """Fix the serial device.
266
267    This function tries to fix the serial device by
268    (1) re-creating a serial device, or
269    (2) rebooting the chameleon board.
270
271    @param host: the DUT, usually a chromebook
272    @param device: the bluetooth HID device
273
274    @returns: True if the serial device is fixed. False otherwise.
275
276    """
277    # Check the serial connection. Fix it if needed.
278    if device.CheckSerialConnection():
279        # The USB serial connection still exists.
280        # Re-connection suffices to solve the problem. The problem
281        # is usually caused by serial port change. For example,
282        # the serial port changed from /dev/ttyUSB0 to /dev/ttyUSB1.
283        logging.info('retry: creating a new serial device...')
284        if not recreate_serial_device(device):
285            return False
286
287    # Check if recreate_serial_device() above fixes the problem.
288    # If not, reboot the chameleon board including creation of a new
289    # bluetooth device. Check if reboot fixes the problem.
290    # If not, return False.
291    result = _run_method(device.EnterCommandMode, 'EnterCommandMode')
292    return _is_successful(result) or _reboot_chameleon(host, device)
293
294
295def retry(test_method, instance, *args, **kwargs):
296    """Execute the target facade test_method(). Retry if failing the first time.
297
298    A test_method is something like self.test_xxxx() in BluetoothAdapterTests,
299    e.g., BluetoothAdapterTests.test_bluetoothd_running().
300
301    @param test_method: the test method to retry
302
303    @returns: True if the return value of test_method() is successful.
304              False otherwise.
305
306    """
307    if _is_successful(_run_method(test_method, test_method.__name__,
308                                  instance, *args, **kwargs)):
309        return True
310
311    # Try to fix the serial device if applicable.
312    logging.error('%s failed at the 1st time.', test_method.__name__)
313
314    # If this test does not use any attached serial device, just re-run
315    # the test.
316    logging.info('%s: retry the 2nd time.', test_method.__name__)
317    time.sleep(1)
318    if not hasattr(instance, 'device_type'):
319        return _is_successful(_run_method(test_method, test_method.__name__,
320                                          instance, *args, **kwargs))
321
322    host = instance.host
323    device = instance.devices[instance.device_type]
324    if not fix_serial_device(host, device):
325        return False
326
327    logging.info('%s: retry the 2nd time.', test_method.__name__)
328    return _is_successful(_run_method(test_method, test_method.__name__,
329                                      instance, *args, **kwargs))
330
331
332def _test_retry_and_log(test_method_or_retry_flag):
333    """A decorator that logs test results, collects error messages, and retries
334       on request.
335
336    @param test_method_or_retry_flag: either the test_method or a retry_flag.
337        There are some possibilities of this argument:
338        1. the test_method to conduct and retry: should retry the test_method.
339            This occurs with
340            @_test_retry_and_log
341        2. the retry flag is True. Should retry the test_method.
342            This occurs with
343            @_test_retry_and_log(True)
344        3. the retry flag is False. Do not retry the test_method.
345            This occurs with
346            @_test_retry_and_log(False)
347
348    @returns: a wrapper of the test_method with test log. The retry mechanism
349        would depend on the retry flag.
350
351    """
352
353    def decorator(test_method):
354        """A decorator wrapper of the decorated test_method.
355
356        @param test_method: the test method being decorated.
357
358        @returns the wrapper of the test method.
359
360        """
361        @functools.wraps(test_method)
362        def wrapper(instance, *args, **kwargs):
363            """A wrapper of the decorated method.
364
365            @param instance: an BluetoothAdapterTests instance
366
367            @returns the result of the test method
368
369            """
370            instance.results = None
371            if callable(test_method_or_retry_flag) or test_method_or_retry_flag:
372                test_result = retry(test_method, instance, *args, **kwargs)
373            else:
374                test_result = test_method(instance, *args, **kwargs)
375
376            if test_result:
377                logging.info('[*** passed: %s]', test_method.__name__)
378            else:
379                fail_msg = '[--- failed: %s (%s)]' % (test_method.__name__,
380                                                      str(instance.results))
381                logging.error(fail_msg)
382                instance.fails.append(fail_msg)
383            return test_result
384        return wrapper
385
386    if callable(test_method_or_retry_flag):
387        # If the decorator function comes with no argument like
388        # @_test_retry_and_log
389        return decorator(test_method_or_retry_flag)
390    else:
391        # If the decorator function comes with an argument like
392        # @_test_retry_and_log(False)
393        return decorator
394
395
396def test_case_log(method):
397    """A decorator for test case methods.
398
399    The main purpose of this decorator is to display the test case name
400    in the test log which looks like
401
402        <... test_case_RA3_CD_SI200_CD_PC_CD_UA3 ...>
403
404    @param method: the test case method to decorate.
405
406    @returns: a wrapper function of the decorated method.
407
408    """
409    @functools.wraps(method)
410    def wrapper(instance, *args, **kwargs):
411        """Log the name of the wrapped method before execution"""
412        logging.info('\n<... %s ...>', method.__name__)
413        method(instance, *args, **kwargs)
414    return wrapper
415
416
417class BluetoothAdapterTests(test.test):
418    """Server side bluetooth adapter tests.
419
420    This test class tries to thoroughly verify most of the important work
421    states of a bluetooth adapter.
422
423    The various test methods are supposed to be invoked by actual autotest
424    tests such as server/cros/site_tests/bluetooth_Adapter*.
425
426    """
427    version = 1
428    ADAPTER_ACTION_SLEEP_SECS = 1
429    ADAPTER_PAIRING_TIMEOUT_SECS = 60
430    ADAPTER_CONNECTION_TIMEOUT_SECS = 30
431    ADAPTER_DISCONNECTION_TIMEOUT_SECS = 30
432    ADAPTER_PAIRING_POLLING_SLEEP_SECS = 3
433    ADAPTER_DISCOVER_TIMEOUT_SECS = 60          # 30 seconds too short sometimes
434    ADAPTER_DISCOVER_POLLING_SLEEP_SECS = 1
435    ADAPTER_DISCOVER_NAME_TIMEOUT_SECS = 30
436    # TODO(shijinabraham@) Remove when crbug/905374 is fixed
437    ADAPTER_STOP_DISCOVERY_TIMEOUT_SECS = 180
438
439    ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS = 10
440    ADAPTER_POLLING_DEFAULT_SLEEP_SECS = 1
441
442    HID_REPORT_SLEEP_SECS = 1
443
444    # Default suspend time in seconds for suspend resume.
445    SUSPEND_TIME_SECS=10
446
447    # hci0 is the default hci device if there is no external bluetooth dongle.
448    EXPECTED_HCI = 'hci0'
449
450    CLASS_OF_SERVICE_MASK = 0xFFE000
451    CLASS_OF_DEVICE_MASK = 0x001FFF
452
453    # Constants about advertising.
454    DAFAULT_MIN_ADVERTISEMENT_INTERVAL_MS = 1280
455    DAFAULT_MAX_ADVERTISEMENT_INTERVAL_MS = 1280
456    ADVERTISING_INTERVAL_UNIT = 0.625
457
458    # Error messages about advertising dbus methods.
459    ERROR_FAILED_TO_REGISTER_ADVERTISEMENT = (
460            'org.bluez.Error.Failed: Failed to register advertisement')
461    ERROR_INVALID_ADVERTISING_INTERVALS = (
462            'org.bluez.Error.InvalidArguments: Invalid arguments')
463
464    # Supported profiles by chrome os.
465    SUPPORTED_UUIDS = {
466            'HSP_AG_UUID': '00001112-0000-1000-8000-00805f9b34fb',
467            'GATT_UUID': '00001801-0000-1000-8000-00805f9b34fb',
468            'A2DP_SOURCE_UUID': '0000110a-0000-1000-8000-00805f9b34fb',
469            'HFP_AG_UUID': '0000111f-0000-1000-8000-00805f9b34fb',
470            'PNP_UUID': '00001200-0000-1000-8000-00805f9b34fb',
471            'GAP_UUID': '00001800-0000-1000-8000-00805f9b34fb'}
472
473
474    def get_device(self, device_type):
475        """Get the bluetooth device object.
476
477        @param device_type : the bluetooth HID device type, e.g., 'MOUSE'
478
479        @returns: the bluetooth device object
480
481        """
482        self.device_type = device_type
483        if self.devices[device_type] is None:
484            self.devices[device_type] = get_bluetooth_emulated_device(
485                    self.host, device_type)
486        return self.devices[device_type]
487
488
489    def suspend_resume(self, suspend_time=SUSPEND_TIME_SECS):
490        """Suspend the DUT for a while and then resume.
491
492        @param suspend_time: the suspend time in secs
493
494        """
495        logging.info('The DUT suspends for %d seconds...', suspend_time)
496        try:
497            self.host.suspend(suspend_time=suspend_time)
498        except error.AutoservSuspendError:
499            logging.error('The DUT did not suspend for %d seconds', suspend_time)
500            pass
501        logging.info('The DUT is waken up.')
502
503
504    def _wait_for_condition(self, func, method_name,
505                            timeout=ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS,
506                            sleep_interval=ADAPTER_POLLING_DEFAULT_SLEEP_SECS):
507        """Wait for the func() to become True.
508
509        @param fun: the function to wait for.
510        @param method_name: the invoking class method.
511        @param timeout: number of seconds to wait before giving up.
512        @param sleep_interval: the interval in seconds to sleep between
513                invoking func().
514
515        @returns: the bluetooth device object
516
517        """
518
519        try:
520            utils.poll_for_condition(condition=func,
521                                     timeout=timeout,
522                                     sleep_interval=sleep_interval,
523                                     desc=('Waiting %s' % method_name))
524            return True
525        except utils.TimeoutError as e:
526            logging.error('%s: %s', method_name, e)
527        except Exception as e:
528            logging.error('%s: %s', method_name, e)
529            err = 'bluetoothd possibly crashed. Check out /var/log/messages.'
530            logging.error(err)
531        except:
532            logging.error('%s: unexpected error', method_name)
533        return False
534
535
536    # -------------------------------------------------------------------
537    # Adater standalone tests
538    # -------------------------------------------------------------------
539
540
541    @_test_retry_and_log
542    def test_bluetoothd_running(self):
543        """Test that bluetoothd is running."""
544        return self.bluetooth_facade.is_bluetoothd_running()
545
546
547    @_test_retry_and_log
548    def test_start_bluetoothd(self):
549        """Test that bluetoothd could be started successfully."""
550        return self.bluetooth_facade.start_bluetoothd()
551
552
553    @_test_retry_and_log
554    def test_stop_bluetoothd(self):
555        """Test that bluetoothd could be stopped successfully."""
556        return self.bluetooth_facade.stop_bluetoothd()
557
558
559    @_test_retry_and_log
560    def test_adapter_work_state(self):
561        """Test that the bluetooth adapter is in the correct working state.
562
563        This includes that the adapter is detectable, is powered on,
564        and its hci device is hci0.
565        """
566        has_adapter = self.bluetooth_facade.has_adapter()
567        is_powered_on = self._wait_for_condition(
568                self.bluetooth_facade.is_powered_on, method_name())
569        hci = self.bluetooth_facade.get_hci() == self.EXPECTED_HCI
570        self.results = {
571                'has_adapter': has_adapter,
572                'is_powered_on': is_powered_on,
573                'hci': hci}
574        return all(self.results.values())
575
576
577    @_test_retry_and_log
578    def test_power_on_adapter(self):
579        """Test that the adapter could be powered on successfully."""
580        power_on = self.bluetooth_facade.set_powered(True)
581        is_powered_on = self._wait_for_condition(
582                self.bluetooth_facade.is_powered_on, method_name())
583
584        self.results = {'power_on': power_on, 'is_powered_on': is_powered_on}
585        return all(self.results.values())
586
587
588    @_test_retry_and_log
589    def test_power_off_adapter(self):
590        """Test that the adapter could be powered off successfully."""
591        power_off = self.bluetooth_facade.set_powered(False)
592        is_powered_off = self._wait_for_condition(
593                lambda: not self.bluetooth_facade.is_powered_on(),
594                method_name())
595
596        self.results = {
597                'power_off': power_off,
598                'is_powered_off': is_powered_off}
599        return all(self.results.values())
600
601
602    @_test_retry_and_log
603    def test_reset_on_adapter(self):
604        """Test that the adapter could be reset on successfully.
605
606        This includes restarting bluetoothd, and removing the settings
607        and cached devices.
608        """
609        reset_on = self.bluetooth_facade.reset_on()
610        is_powered_on = self._wait_for_condition(
611                self.bluetooth_facade.is_powered_on, method_name())
612
613        self.results = {'reset_on': reset_on, 'is_powered_on': is_powered_on}
614        return all(self.results.values())
615
616
617    @_test_retry_and_log
618    def test_reset_off_adapter(self):
619        """Test that the adapter could be reset off successfully.
620
621        This includes restarting bluetoothd, and removing the settings
622        and cached devices.
623        """
624        reset_off = self.bluetooth_facade.reset_off()
625        is_powered_off = self._wait_for_condition(
626                lambda: not self.bluetooth_facade.is_powered_on(),
627                method_name())
628
629        self.results = {
630                'reset_off': reset_off,
631                'is_powered_off': is_powered_off}
632        return all(self.results.values())
633
634
635    @_test_retry_and_log
636    def test_UUIDs(self):
637        """Test that basic profiles are supported."""
638        adapter_UUIDs = self.bluetooth_facade.get_UUIDs()
639        self.results = [uuid for uuid in self.SUPPORTED_UUIDS.values()
640                        if uuid not in adapter_UUIDs]
641        return not bool(self.results)
642
643
644    @_test_retry_and_log
645    def test_start_discovery(self):
646        """Test that the adapter could start discovery."""
647        start_discovery = self.bluetooth_facade.start_discovery()
648        is_discovering = self._wait_for_condition(
649                self.bluetooth_facade.is_discovering, method_name())
650
651        self.results = {
652                'start_discovery': start_discovery,
653                'is_discovering': is_discovering}
654        return all(self.results.values())
655
656
657    @_test_retry_and_log
658    def test_stop_discovery(self):
659        """Test that the adapter could stop discovery."""
660        stop_discovery = self.bluetooth_facade.stop_discovery()
661        is_not_discovering = self._wait_for_condition(
662                lambda: not self.bluetooth_facade.is_discovering(),
663                method_name(),
664                timeout=self.ADAPTER_STOP_DISCOVERY_TIMEOUT_SECS)
665
666        self.results = {
667                'stop_discovery': stop_discovery,
668                'is_not_discovering': is_not_discovering}
669        return all(self.results.values())
670
671
672    @_test_retry_and_log
673    def test_discoverable(self):
674        """Test that the adapter could be set discoverable."""
675        set_discoverable = self.bluetooth_facade.set_discoverable(True)
676        is_discoverable = self._wait_for_condition(
677                self.bluetooth_facade.is_discoverable, method_name())
678
679        self.results = {
680                'set_discoverable': set_discoverable,
681                'is_discoverable': is_discoverable}
682        return all(self.results.values())
683
684
685    @_test_retry_and_log
686    def test_nondiscoverable(self):
687        """Test that the adapter could be set non-discoverable."""
688        set_nondiscoverable = self.bluetooth_facade.set_discoverable(False)
689        is_nondiscoverable = self._wait_for_condition(
690                lambda: not self.bluetooth_facade.is_discoverable(),
691                method_name())
692
693        self.results = {
694                'set_nondiscoverable': set_nondiscoverable,
695                'is_nondiscoverable': is_nondiscoverable}
696        return all(self.results.values())
697
698
699    @_test_retry_and_log
700    def test_pairable(self):
701        """Test that the adapter could be set pairable."""
702        set_pairable = self.bluetooth_facade.set_pairable(True)
703        is_pairable = self._wait_for_condition(
704                self.bluetooth_facade.is_pairable, method_name())
705
706        self.results = {
707                'set_pairable': set_pairable,
708                'is_pairable': is_pairable}
709        return all(self.results.values())
710
711
712    @_test_retry_and_log
713    def test_nonpairable(self):
714        """Test that the adapter could be set non-pairable."""
715        set_nonpairable = self.bluetooth_facade.set_pairable(False)
716        is_nonpairable = self._wait_for_condition(
717                lambda: not self.bluetooth_facade.is_pairable(), method_name())
718
719        self.results = {
720                'set_nonpairable': set_nonpairable,
721                'is_nonpairable': is_nonpairable}
722        return all(self.results.values())
723
724
725    # -------------------------------------------------------------------
726    # Tests about general discovering, pairing, and connection
727    # -------------------------------------------------------------------
728
729
730    @_test_retry_and_log
731    def test_discover_device(self, device_address):
732        """Test that the adapter could discover the specified device address.
733
734        @param device_address: Address of the device.
735
736        @returns: True if the device is found. False otherwise.
737
738        """
739        has_device_initially = False
740        start_discovery = False
741        device_discovered = False
742        has_device = self.bluetooth_facade.has_device
743
744        if has_device(device_address):
745            has_device_initially = True
746        elif self.bluetooth_facade.start_discovery():
747            start_discovery = True
748            try:
749                utils.poll_for_condition(
750                        condition=(lambda: has_device(device_address)),
751                        timeout=self.ADAPTER_DISCOVER_TIMEOUT_SECS,
752                        sleep_interval=self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS,
753                        desc='Waiting for discovering %s' % device_address)
754                device_discovered = True
755            except utils.TimeoutError as e:
756                logging.error('test_discover_device: %s', e)
757            except Exception as e:
758                logging.error('test_discover_device: %s', e)
759                err = 'bluetoothd probably crashed. Check out /var/log/messages'
760                logging.error(err)
761            except:
762                logging.error('test_discover_device: unexpected error')
763
764        self.results = {
765                'has_device_initially': has_device_initially,
766                'start_discovery': start_discovery,
767                'device_discovered': device_discovered}
768        return has_device_initially or device_discovered
769
770
771    @_test_retry_and_log
772    def test_pairing(self, device_address, pin, trusted=True):
773        """Test that the adapter could pair with the device successfully.
774
775        @param device_address: Address of the device.
776        @param pin: pin code to pair with the device.
777        @param trusted: indicating whether to set the device trusted.
778
779        @returns: True if pairing succeeds. False otherwise.
780
781        """
782
783        def _pair_device():
784            """Pair to the device.
785
786            @returns: True if it could pair with the device. False otherwise.
787
788            """
789            return self.bluetooth_facade.pair_legacy_device(
790                    device_address, pin, trusted,
791                    self.ADAPTER_PAIRING_TIMEOUT_SECS)
792
793
794        has_device = False
795        paired = False
796        if self.bluetooth_facade.has_device(device_address):
797            has_device = True
798            try:
799                utils.poll_for_condition(
800                        condition=_pair_device,
801                        timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
802                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
803                        desc='Waiting for pairing %s' % device_address)
804                paired = True
805            except utils.TimeoutError as e:
806                logging.error('test_pairing: %s', e)
807            except:
808                logging.error('test_pairing: unexpected error')
809
810        self.results = {'has_device': has_device, 'paired': paired}
811        return all(self.results.values())
812
813
814    @_test_retry_and_log
815    def test_remove_pairing(self, device_address):
816        """Test that the adapter could remove the paired device.
817
818        @param device_address: Address of the device.
819
820        @returns: True if the device is removed successfully. False otherwise.
821
822        """
823        device_is_paired_initially = self.bluetooth_facade.device_is_paired(
824                device_address)
825        remove_pairing = False
826        pairing_removed = False
827
828        if device_is_paired_initially:
829            remove_pairing = self.bluetooth_facade.remove_device_object(
830                    device_address)
831            pairing_removed = not self.bluetooth_facade.device_is_paired(
832                    device_address)
833
834        self.results = {
835                'device_is_paired_initially': device_is_paired_initially,
836                'remove_pairing': remove_pairing,
837                'pairing_removed': pairing_removed}
838        return all(self.results.values())
839
840
841    def test_set_trusted(self, device_address, trusted=True):
842        """Test whether the device with the specified address is trusted.
843
844        @param device_address: Address of the device.
845        @param trusted : True or False indicating if trusted is expected.
846
847        @returns: True if the device's "Trusted" property is as specified;
848                  False otherwise.
849
850        """
851
852        set_trusted = self.bluetooth_facade.set_trusted(
853                device_address, trusted)
854
855        properties = self.bluetooth_facade.get_device_properties(
856                device_address)
857        actual_trusted = properties.get('Trusted')
858
859        self.results = {
860                'set_trusted': set_trusted,
861                'actual trusted': actual_trusted,
862                'expected trusted': trusted}
863        return actual_trusted == trusted
864
865
866    @_test_retry_and_log
867    def test_connection_by_adapter(self, device_address):
868        """Test that the adapter of dut could connect to the device successfully
869
870        It is the caller's responsibility to pair to the device before
871        doing connection.
872
873        @param device_address: Address of the device.
874
875        @returns: True if connection is performed. False otherwise.
876
877        """
878
879        def _connect_device():
880            """Connect to the device.
881
882            @returns: True if it could connect to the device. False otherwise.
883
884            """
885            return self.bluetooth_facade.connect_device(device_address)
886
887
888        has_device = False
889        connected = False
890        if self.bluetooth_facade.has_device(device_address):
891            has_device = True
892            try:
893                utils.poll_for_condition(
894                        condition=_connect_device,
895                        timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
896                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
897                        desc='Waiting for connecting to %s' % device_address)
898                connected = True
899            except utils.TimeoutError as e:
900                logging.error('test_connection_by_adapter: %s', e)
901            except:
902                logging.error('test_connection_by_adapter: unexpected error')
903
904        self.results = {'has_device': has_device, 'connected': connected}
905        return all(self.results.values())
906
907
908    @_test_retry_and_log
909    def test_disconnection_by_adapter(self, device_address):
910        """Test that the adapter of dut could disconnect the device successfully
911
912        @param device_address: Address of the device.
913
914        @returns: True if disconnection is performed. False otherwise.
915
916        """
917        return self.bluetooth_facade.disconnect_device(device_address)
918
919
920    def _enter_command_mode(self, device):
921        """Let the device enter command mode.
922
923        Before using the device, need to call this method to make sure
924        it is in the command mode.
925
926        @param device: the bluetooth HID device
927
928        @returns: True if successful. False otherwise.
929
930        """
931        result = _is_successful(_run_method(device.EnterCommandMode,
932                                            'EnterCommandMode'))
933        if not result:
934            logging.error('EnterCommandMode failed')
935        return result
936
937
938    @_test_retry_and_log
939    def test_connection_by_device(self, device):
940        """Test that the device could connect to the adapter successfully.
941
942        This emulates the behavior that a device may initiate a
943        connection request after waking up from power saving mode.
944
945        @param device: the bluetooth HID device
946
947        @returns: True if connection is performed correctly by device and
948                  the adapter also enters connection state.
949                  False otherwise.
950
951        """
952        if not self._enter_command_mode(device):
953            return False
954
955        method_name = 'test_connection_by_device'
956        connection_by_device = False
957        adapter_address = self.bluetooth_facade.address
958        try:
959            device.ConnectToRemoteAddress(adapter_address)
960            connection_by_device = True
961        except Exception as e:
962            logging.error('%s (device): %s', method_name, e)
963        except:
964            logging.error('%s (device): unexpected error', method_name)
965
966        connection_seen_by_adapter = False
967        device_address = device.address
968        device_is_connected = self.bluetooth_facade.device_is_connected
969        try:
970            utils.poll_for_condition(
971                    condition=lambda: device_is_connected(device_address),
972                    timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS,
973                    desc=('Waiting for connection from %s' % device_address))
974            connection_seen_by_adapter = True
975        except utils.TimeoutError as e:
976            logging.error('%s (adapter): %s', method_name, e)
977        except:
978            logging.error('%s (adapter): unexpected error', method_name)
979
980        self.results = {
981                'connection_by_device': connection_by_device,
982                'connection_seen_by_adapter': connection_seen_by_adapter}
983        return all(self.results.values())
984
985
986    @_test_retry_and_log
987    def test_disconnection_by_device(self, device):
988        """Test that the device could disconnect the adapter successfully.
989
990        This emulates the behavior that a device may initiate a
991        disconnection request before going into power saving mode.
992
993        Note: should not try to enter command mode in this method. When
994              a device is connected, there is no way to enter command mode.
995              One could just issue a special disconnect command without
996              entering command mode.
997
998        @param device: the bluetooth HID device
999
1000        @returns: True if disconnection is performed correctly by device and
1001                  the adapter also observes the disconnection.
1002                  False otherwise.
1003
1004        """
1005        method_name = 'test_disconnection_by_device'
1006        disconnection_by_device = False
1007        try:
1008            device.Disconnect()
1009            disconnection_by_device = True
1010        except Exception as e:
1011            logging.error('%s (device): %s', method_name, e)
1012        except:
1013            logging.error('%s (device): unexpected error', method_name)
1014
1015        disconnection_seen_by_adapter = False
1016        device_address = device.address
1017        device_is_connected = self.bluetooth_facade.device_is_connected
1018        try:
1019            utils.poll_for_condition(
1020                    condition=lambda: not device_is_connected(device_address),
1021                    timeout=self.ADAPTER_DISCONNECTION_TIMEOUT_SECS,
1022                    desc=('Waiting for disconnection from %s' % device_address))
1023            disconnection_seen_by_adapter = True
1024        except utils.TimeoutError as e:
1025            logging.error('%s (adapter): %s', method_name, e)
1026        except:
1027            logging.error('%s (adapter): unexpected error', method_name)
1028
1029        self.results = {
1030                'disconnection_by_device': disconnection_by_device,
1031                'disconnection_seen_by_adapter': disconnection_seen_by_adapter}
1032        return all(self.results.values())
1033
1034    @_test_retry_and_log
1035    def test_device_is_connected(self, device_address):
1036        """Test that device address given is currently connected.
1037
1038        @param device_address: Address of the device.
1039
1040        @returns: True if the device is connected.
1041                  False otherwise.
1042
1043        """
1044        def _is_connected():
1045            """Test if device is connected.
1046
1047            @returns: True if device is connected. False otherwise.
1048
1049            """
1050            return self.bluetooth_facade.device_is_connected(device_address)
1051
1052
1053        method_name = 'test_device_is_connected'
1054        has_device = False
1055        connected = False
1056        if self.bluetooth_facade.has_device(device_address):
1057            has_device = True
1058            try:
1059                utils.poll_for_condition(
1060                        condition=_is_connected,
1061                        timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS,
1062                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
1063                        desc='Waiting for connection to %s' % device_address)
1064                connected = True
1065            except utils.TimeoutError as e:
1066                logging.error('%s: %s', method_name, e)
1067            except:
1068                logging.error('%s: unexpected error', method_name)
1069        self.results = {'has_device': has_device, 'connected': connected}
1070        return all(self.results.values())
1071
1072
1073    @_test_retry_and_log
1074    def test_device_is_paired(self, device_address):
1075        """Test that the device address given is currently paired.
1076
1077        @param device_address: Address of the device.
1078
1079        @returns: True if the device is paired.
1080                  False otherwise.
1081
1082        """
1083        def _is_paired():
1084            """Test if device is paired.
1085
1086            @returns: True if device is paired. False otherwise.
1087
1088            """
1089            return self.bluetooth_facade.device_is_paired(device_address)
1090
1091
1092        method_name = 'test_device_is_paired'
1093        has_device = False
1094        paired = False
1095        if self.bluetooth_facade.has_device(device_address):
1096            has_device = True
1097            try:
1098                utils.poll_for_condition(
1099                        condition=_is_paired,
1100                        timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
1101                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
1102                        desc='Waiting for connection to %s' % device_address)
1103                paired = True
1104            except utils.TimeoutError as e:
1105                logging.error('%s: %s', method_name, e)
1106            except:
1107                logging.error('%s: unexpected error', method_name)
1108        self.results = {'has_device': has_device, 'paired': paired}
1109        return all(self.results.values())
1110
1111
1112    def _get_device_name(self, device_address):
1113        """Get the device name.
1114
1115        @returns: True if the device name is derived. None otherwise.
1116
1117        """
1118        properties = self.bluetooth_facade.get_device_properties(
1119                device_address)
1120        self.discovered_device_name = properties.get('Name')
1121        return bool(self.discovered_device_name)
1122
1123
1124    @_test_retry_and_log
1125    def test_device_name(self, device_address, expected_device_name):
1126        """Test that the device name discovered by the adapter is correct.
1127
1128        @param device_address: Address of the device.
1129        @param expected_device_name: the bluetooth device name
1130
1131        @returns: True if the discovered_device_name is expected_device_name.
1132                  False otherwise.
1133
1134        """
1135        try:
1136            utils.poll_for_condition(
1137                    condition=lambda: self._get_device_name(device_address),
1138                    timeout=self.ADAPTER_DISCOVER_NAME_TIMEOUT_SECS,
1139                    sleep_interval=self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS,
1140                    desc='Waiting for device name of %s' % device_address)
1141        except utils.TimeoutError as e:
1142            logging.error('test_device_name: %s', e)
1143        except:
1144            logging.error('test_device_name: unexpected error')
1145
1146        self.results = {
1147                'expected_device_name': expected_device_name,
1148                'discovered_device_name': self.discovered_device_name}
1149        return self.discovered_device_name == expected_device_name
1150
1151
1152    @_test_retry_and_log
1153    def test_device_class_of_service(self, device_address,
1154                                     expected_class_of_service):
1155        """Test that the discovered device class of service is as expected.
1156
1157        @param device_address: Address of the device.
1158        @param expected_class_of_service: the expected class of service
1159
1160        @returns: True if the discovered class of service matches the
1161                  expected class of service. False otherwise.
1162
1163        """
1164        properties = self.bluetooth_facade.get_device_properties(
1165                device_address)
1166        device_class = properties.get('Class')
1167        discovered_class_of_service = (device_class & self.CLASS_OF_SERVICE_MASK
1168                                       if device_class else None)
1169
1170        self.results = {
1171                'device_class': device_class,
1172                'expected_class_of_service': expected_class_of_service,
1173                'discovered_class_of_service': discovered_class_of_service}
1174        return discovered_class_of_service == expected_class_of_service
1175
1176
1177    @_test_retry_and_log
1178    def test_device_class_of_device(self, device_address,
1179                                    expected_class_of_device):
1180        """Test that the discovered device class of device is as expected.
1181
1182        @param device_address: Address of the device.
1183        @param expected_class_of_device: the expected class of device
1184
1185        @returns: True if the discovered class of device matches the
1186                  expected class of device. False otherwise.
1187
1188        """
1189        properties = self.bluetooth_facade.get_device_properties(
1190                device_address)
1191        device_class = properties.get('Class')
1192        discovered_class_of_device = (device_class & self.CLASS_OF_DEVICE_MASK
1193                                      if device_class else None)
1194
1195        self.results = {
1196                'device_class': device_class,
1197                'expected_class_of_device': expected_class_of_device,
1198                'discovered_class_of_device': discovered_class_of_device}
1199        return discovered_class_of_device == expected_class_of_device
1200
1201
1202    def _get_btmon_log(self, method, logging_timespan=1):
1203        """Capture the btmon log when executing the specified method.
1204
1205        @param method: the method to capture log.
1206                       The method would be executed only when it is not None.
1207                       This allows us to simply capture btmon log without
1208                       executing any command.
1209        @param logging_timespan: capture btmon log for logging_timespan seconds.
1210
1211        """
1212        self.bluetooth_le_facade.btmon_start()
1213        self.advertising_msg = method() if method else ''
1214        time.sleep(logging_timespan)
1215        self.bluetooth_le_facade.btmon_stop()
1216
1217
1218    def convert_to_adv_jiffies(self, adv_interval_ms):
1219        """Convert adv interval in ms to jiffies, i.e., multiples of 0.625 ms.
1220
1221        @param adv_interval_ms: an advertising interval
1222
1223        @returns: the equivalent jiffies
1224
1225        """
1226        return adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT
1227
1228
1229    def compute_duration(self, max_adv_interval_ms):
1230        """Compute duration from max_adv_interval_ms.
1231
1232        Advertising duration is calculated approximately as
1233            duration = max_adv_interval_ms / 1000.0 * 1.1
1234
1235        @param max_adv_interval_ms: max advertising interval in milliseconds.
1236
1237        @returns: duration in seconds.
1238
1239        """
1240        return max_adv_interval_ms / 1000.0 * 1.1
1241
1242
1243    def compute_logging_timespan(self, max_adv_interval_ms):
1244        """Compute the logging timespan from max_adv_interval_ms.
1245
1246        The logging timespan is the time needed to record btmon log.
1247
1248        @param max_adv_interval_ms: max advertising interval in milliseconds.
1249
1250        @returns: logging_timespan in seconds.
1251
1252        """
1253        duration = self.compute_duration(max_adv_interval_ms)
1254        logging_timespan = max(self.count_advertisements * duration, 1)
1255        return logging_timespan
1256
1257
1258    @_test_retry_and_log(False)
1259    def test_check_duration_and_intervals(self, min_adv_interval_ms,
1260                                          max_adv_interval_ms,
1261                                          number_advertisements):
1262        """Verify that every advertisements are scheduled according to the
1263        duration and intervals.
1264
1265        An advertisement would be scheduled at the time span of
1266             duration * number_advertisements
1267
1268        @param min_adv_interval_ms: min advertising interval in milliseconds.
1269        @param max_adv_interval_ms: max advertising interval in milliseconds.
1270        @param number_advertisements: the number of existing advertisements
1271
1272        @returns: True if all advertisements are scheduled based on the
1273                duration and intervals.
1274
1275        """
1276
1277
1278        def within_tolerance(expected, actual, max_error=0.1):
1279            """Determine if the percent error is within specified tolerance.
1280
1281            @param expected: The expected value.
1282            @param actual: The actual (measured) value.
1283            @param max_error: The maximum percent error acceptable.
1284
1285            @returns: True if the percent error is less than or equal to
1286                      max_error.
1287            """
1288            return abs(expected - actual) / abs(expected) <= max_error
1289
1290
1291        start_str = 'Set Advertising Intervals:'
1292        search_strings = ['HCI Command: LE Set Advertising Data', 'Company']
1293        search_str = '|'.join(search_strings)
1294
1295        contents = self.bluetooth_le_facade.btmon_get(search_str=search_str,
1296                                                      start_str=start_str)
1297
1298        # Company string looks like
1299        #   Company: not assigned (65283)
1300        company_pattern = re.compile('Company:.*\((\d*)\)')
1301
1302        # The string with timestamp looks like
1303        #   < HCI Command: LE Set Advertising Data (0x08|0x0008) [hci0] 3.799236
1304        set_adv_time_str = 'LE Set Advertising Data.*\[hci\d\].*(\d+\.\d+)'
1305        set_adv_time_pattern = re.compile(set_adv_time_str)
1306
1307        adv_timestamps = {}
1308        timestamp = None
1309        manufacturer_id = None
1310        for line in contents:
1311            result = set_adv_time_pattern.search(line)
1312            if result:
1313                timestamp = float(result.group(1))
1314
1315            result = company_pattern.search(line)
1316            if result:
1317                manufacturer_id = '0x%04x' % int(result.group(1))
1318
1319            if timestamp and manufacturer_id:
1320                if manufacturer_id not in adv_timestamps:
1321                    adv_timestamps[manufacturer_id] = []
1322                adv_timestamps[manufacturer_id].append(timestamp)
1323                timestamp = None
1324                manufacturer_id = None
1325
1326        duration = self.compute_duration(max_adv_interval_ms)
1327        expected_timespan = duration * number_advertisements
1328
1329        check_duration = True
1330        for manufacturer_id, values in adv_timestamps.iteritems():
1331            logging.debug('manufacturer_id %s: %s', manufacturer_id, values)
1332            timespans = [values[i] - values[i - 1]
1333                         for i in xrange(1, len(values))]
1334            errors = [timespans[i] for i in xrange(len(timespans))
1335                      if not within_tolerance(expected_timespan, timespans[i])]
1336            logging.debug('timespans: %s', timespans)
1337            logging.debug('errors: %s', errors)
1338            if bool(errors):
1339                check_duration = False
1340
1341        # Verify that the advertising intervals are also correct.
1342        min_adv_interval_ms_found, max_adv_interval_ms_found = (
1343                self._verify_advertising_intervals(min_adv_interval_ms,
1344                                                   max_adv_interval_ms))
1345
1346        self.results = {
1347                'check_duration': check_duration,
1348                'max_adv_interval_ms_found': max_adv_interval_ms_found,
1349                'max_adv_interval_ms_found': max_adv_interval_ms_found,
1350        }
1351        return all(self.results.values())
1352
1353
1354    def _get_min_max_intervals_strings(self, min_adv_interval_ms,
1355                                       max_adv_interval_ms):
1356        """Get the min and max advertising intervals strings shown in btmon.
1357
1358        Advertising intervals shown in the btmon log look like
1359            Min advertising interval: 1280.000 msec (0x0800)
1360            Max advertising interval: 1280.000 msec (0x0800)
1361
1362        @param min_adv_interval_ms: min advertising interval in milliseconds.
1363        @param max_adv_interval_ms: max advertising interval in milliseconds.
1364
1365        @returns: the min and max intervals strings.
1366
1367        """
1368        min_str = ('Min advertising interval: %.3f msec (0x%04x)' %
1369                   (min_adv_interval_ms,
1370                    min_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT))
1371        logging.debug('min_adv_interval_ms: %s', min_str)
1372
1373        max_str = ('Max advertising interval: %.3f msec (0x%04x)' %
1374                   (max_adv_interval_ms,
1375                    max_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT))
1376        logging.debug('max_adv_interval_ms: %s', max_str)
1377
1378        return (min_str, max_str)
1379
1380
1381    def _verify_advertising_intervals(self, min_adv_interval_ms,
1382                                      max_adv_interval_ms):
1383        """Verify min and max advertising intervals.
1384
1385        Advertising intervals look like
1386            Min advertising interval: 1280.000 msec (0x0800)
1387            Max advertising interval: 1280.000 msec (0x0800)
1388
1389        @param min_adv_interval_ms: min advertising interval in milliseconds.
1390        @param max_adv_interval_ms: max advertising interval in milliseconds.
1391
1392        @returns: a tuple of (True, True) if both min and max advertising
1393            intervals could be found. Otherwise, the corresponding element
1394            in the tuple if False.
1395
1396        """
1397        min_str, max_str = self._get_min_max_intervals_strings(
1398                min_adv_interval_ms, max_adv_interval_ms)
1399
1400        min_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(min_str)
1401        max_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(max_str)
1402
1403        return min_adv_interval_ms_found, max_adv_interval_ms_found
1404
1405
1406    @_test_retry_and_log(False)
1407    def test_register_advertisement(self, advertisement_data, instance_id,
1408                                    min_adv_interval_ms, max_adv_interval_ms):
1409        """Verify that an advertisement is registered correctly.
1410
1411        This test verifies the following data:
1412        - advertisement added
1413        - manufacturer data
1414        - service UUIDs
1415        - service data
1416        - advertising intervals
1417        - advertising enabled
1418
1419        @param advertisement_data: the data of an advertisement to register.
1420        @param instance_id: the instance id which starts at 1.
1421        @param min_adv_interval_ms: min_adv_interval in milliseconds.
1422        @param max_adv_interval_ms: max_adv_interval in milliseconds.
1423
1424        @returns: True if the advertisement is registered correctly.
1425                  False otherwise.
1426
1427        """
1428        # When registering a new advertisement, it is possible that another
1429        # instance is advertising. It may need to wait for all other
1430        # advertisements to complete advertising once.
1431        self.count_advertisements += 1
1432        logging_timespan = self.compute_logging_timespan(max_adv_interval_ms)
1433        self._get_btmon_log(
1434                lambda: self.bluetooth_le_facade.register_advertisement(
1435                        advertisement_data),
1436                logging_timespan=logging_timespan)
1437
1438        # Verify that a new advertisement is added.
1439        advertisement_added = (
1440                self.bluetooth_le_facade.btmon_find('Advertising Added') and
1441                self.bluetooth_le_facade.btmon_find('Instance: %d' %
1442                                                    instance_id))
1443
1444        # Verify that the manufacturer data could be found.
1445        manufacturer_data = advertisement_data.get('ManufacturerData', '')
1446        for manufacturer_id in manufacturer_data:
1447            # The 'not assigned' text below means the manufacturer id
1448            # is not actually assigned to any real manufacturer.
1449            # For real 16-bit manufacturer UUIDs, refer to
1450            #  https://www.bluetooth.com/specifications/assigned-numbers/16-bit-UUIDs-for-Members
1451            manufacturer_data_found = self.bluetooth_le_facade.btmon_find(
1452                    'Company: not assigned (%d)' % int(manufacturer_id, 16))
1453
1454        # Verify that all service UUIDs could be found.
1455        service_uuids_found = True
1456        for uuid in advertisement_data.get('ServiceUUIDs', []):
1457            # Service UUIDs looks like ['0x180D', '0x180F']
1458            #   Heart Rate (0x180D)
1459            #   Battery Service (0x180F)
1460            # For actual 16-bit service UUIDs, refer to
1461            #   https://www.bluetooth.com/specifications/gatt/services
1462            if not self.bluetooth_le_facade.btmon_find('0x%s' % uuid):
1463                service_uuids_found = False
1464                break
1465
1466        # Verify service data.
1467        service_data_found = True
1468        for uuid, data in advertisement_data.get('ServiceData', {}).items():
1469            # A service data looks like
1470            #   Service Data (UUID 0x9999): 0001020304
1471            # while uuid is '9999' and data is [0x00, 0x01, 0x02, 0x03, 0x04]
1472            data_str = ''.join(map(lambda n: '%02x' % n, data))
1473            if not self.bluetooth_le_facade.btmon_find(
1474                    'Service Data (UUID 0x%s): %s' % (uuid, data_str)):
1475                service_data_found = False
1476                break
1477
1478        # Verify that the advertising intervals are correct.
1479        min_adv_interval_ms_found, max_adv_interval_ms_found = (
1480                self._verify_advertising_intervals(min_adv_interval_ms,
1481                                                   max_adv_interval_ms))
1482
1483        # Verify advertising is enabled.
1484        advertising_enabled = self.bluetooth_le_facade.btmon_find(
1485                'Advertising: Enabled (0x01)')
1486
1487        self.results = {
1488                'advertisement_added': advertisement_added,
1489                'manufacturer_data_found': manufacturer_data_found,
1490                'service_uuids_found': service_uuids_found,
1491                'service_data_found': service_data_found,
1492                'min_adv_interval_ms_found': min_adv_interval_ms_found,
1493                'max_adv_interval_ms_found': max_adv_interval_ms_found,
1494                'advertising_enabled': advertising_enabled,
1495        }
1496        return all(self.results.values())
1497
1498
1499    @_test_retry_and_log(False)
1500    def test_fail_to_register_advertisement(self, advertisement_data,
1501                                            min_adv_interval_ms,
1502                                            max_adv_interval_ms):
1503        """Verify that failure is incurred when max advertisements are reached.
1504
1505        This test verifies that a registration failure is incurred when
1506        max advertisements are reached. The error message looks like:
1507
1508            org.bluez.Error.Failed: Maximum advertisements reached
1509
1510        @param advertisement_data: the advertisement to register.
1511        @param min_adv_interval_ms: min_adv_interval in milliseconds.
1512        @param max_adv_interval_ms: max_adv_interval in milliseconds.
1513
1514        @returns: True if the error message is received correctly.
1515                  False otherwise.
1516
1517        """
1518        logging_timespan = self.compute_logging_timespan(max_adv_interval_ms)
1519        self._get_btmon_log(
1520                lambda: self.bluetooth_le_facade.register_advertisement(
1521                        advertisement_data),
1522                logging_timespan=logging_timespan)
1523
1524        # Verify that it failed to register advertisement due to the fact
1525        # that max advertisements are reached.
1526        failed_to_register_error = (self.ERROR_FAILED_TO_REGISTER_ADVERTISEMENT
1527                                    in self.advertising_msg)
1528
1529        # Verify that no new advertisement is added.
1530        advertisement_not_added = not self.bluetooth_le_facade.btmon_find(
1531                'Advertising Added:')
1532
1533        # Verify that the advertising intervals are correct.
1534        min_adv_interval_ms_found, max_adv_interval_ms_found = (
1535                self._verify_advertising_intervals(min_adv_interval_ms,
1536                                                   max_adv_interval_ms))
1537
1538        # Verify advertising remains enabled.
1539        advertising_enabled = self.bluetooth_le_facade.btmon_find(
1540                'Advertising: Enabled (0x01)')
1541
1542        self.results = {
1543                'failed_to_register_error': failed_to_register_error,
1544                'advertisement_not_added': advertisement_not_added,
1545                'min_adv_interval_ms_found': min_adv_interval_ms_found,
1546                'max_adv_interval_ms_found': max_adv_interval_ms_found,
1547                'advertising_enabled': advertising_enabled,
1548        }
1549        return all(self.results.values())
1550
1551
1552    @_test_retry_and_log(False)
1553    def test_unregister_advertisement(self, advertisement_data, instance_id,
1554                                      advertising_disabled):
1555        """Verify that an advertisement is unregistered correctly.
1556
1557        This test verifies the following data:
1558        - advertisement removed
1559        - advertising status: enabled if there are advertisements left;
1560                              disabled otherwise.
1561
1562        @param advertisement_data: the data of an advertisement to unregister.
1563        @param instance_id: the instance id of the advertisement to remove.
1564        @param advertising_disabled: is advertising disabled? This happens
1565                only when all advertisements are removed.
1566
1567        @returns: True if the advertisement is unregistered correctly.
1568                  False otherwise.
1569
1570        """
1571        self.count_advertisements -= 1
1572        self._get_btmon_log(
1573                lambda: self.bluetooth_le_facade.unregister_advertisement(
1574                        advertisement_data))
1575
1576        # Verify that the advertisement is removed.
1577        advertisement_removed = (
1578                self.bluetooth_le_facade.btmon_find('Advertising Removed') and
1579                self.bluetooth_le_facade.btmon_find('Instance: %d' %
1580                                                    instance_id))
1581
1582        # If advertising_disabled is True, there should be no log like
1583        #       'Advertising: Enabled (0x01)'
1584        # If advertising_disabled is False, there should be log like
1585        #       'Advertising: Enabled (0x01)'
1586
1587        # Only need to check advertising status when the last advertisement
1588        # is removed. For any other advertisements prior to the last one,
1589        # we may or may not observe 'Advertising: Enabled (0x01)' message.
1590        # Hence, the test would become flaky if we insist to see that message.
1591        # A possible workaround is to sleep for a while and then check the
1592        # message. The drawback is that we may need to wait up to 10 seconds
1593        # if the advertising duration and intervals are long.
1594        # In a test case, we always run test_check_duration_and_intervals()
1595        # to check if advertising duration and intervals are correct after
1596        # un-registering one or all advertisements, it is safe to do so.
1597        advertising_enabled_found = self.bluetooth_le_facade.btmon_find(
1598                'Advertising: Enabled (0x01)')
1599        advertising_disabled_found = self.bluetooth_le_facade.btmon_find(
1600                'Advertising: Disabled (0x00)')
1601        advertising_status_correct = not advertising_disabled or (
1602                advertising_disabled_found and not advertising_enabled_found)
1603
1604        self.results = {
1605                'advertisement_removed': advertisement_removed,
1606                'advertising_status_correct': advertising_status_correct,
1607        }
1608        return all(self.results.values())
1609
1610
1611    @_test_retry_and_log(False)
1612    def test_set_advertising_intervals(self, min_adv_interval_ms,
1613                                       max_adv_interval_ms):
1614        """Verify that new advertising intervals are set correctly.
1615
1616        Note that setting advertising intervals does not enable/disable
1617        advertising. Hence, there is no need to check the advertising
1618        status.
1619
1620        @param min_adv_interval_ms: the min advertising interval in ms.
1621        @param max_adv_interval_ms: the max advertising interval in ms.
1622
1623        @returns: True if the new advertising intervals are correct.
1624                  False otherwise.
1625
1626        """
1627        self._get_btmon_log(
1628                lambda: self.bluetooth_le_facade.set_advertising_intervals(
1629                        min_adv_interval_ms, max_adv_interval_ms))
1630
1631        # Verify the new advertising intervals.
1632        # With intervals of 200 ms and 200 ms, the log looks like
1633        #   bluetoothd: Set Advertising Intervals: 0x0140, 0x0140
1634        txt = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x'
1635        adv_intervals_found = self.bluetooth_le_facade.btmon_find(
1636                txt % (self.convert_to_adv_jiffies(min_adv_interval_ms),
1637                       self.convert_to_adv_jiffies(max_adv_interval_ms)))
1638
1639        self.results = {'adv_intervals_found': adv_intervals_found}
1640        return all(self.results.values())
1641
1642
1643    @_test_retry_and_log(False)
1644    def test_fail_to_set_advertising_intervals(
1645            self, invalid_min_adv_interval_ms, invalid_max_adv_interval_ms,
1646            orig_min_adv_interval_ms, orig_max_adv_interval_ms):
1647        """Verify that setting invalid advertising intervals results in error.
1648
1649        If invalid min/max advertising intervals are given, it would incur
1650        the error: 'org.bluez.Error.InvalidArguments: Invalid arguments'.
1651        Note that valid advertising intervals fall between 20 ms and 10,240 ms.
1652
1653        @param invalid_min_adv_interval_ms: the invalid min advertising interval
1654                in ms.
1655        @param invalid_max_adv_interval_ms: the invalid max advertising interval
1656                in ms.
1657        @param orig_min_adv_interval_ms: the original min advertising interval
1658                in ms.
1659        @param orig_max_adv_interval_ms: the original max advertising interval
1660                in ms.
1661
1662        @returns: True if it fails to set invalid advertising intervals.
1663                  False otherwise.
1664
1665        """
1666        self._get_btmon_log(
1667                lambda: self.bluetooth_le_facade.set_advertising_intervals(
1668                        invalid_min_adv_interval_ms,
1669                        invalid_max_adv_interval_ms))
1670
1671        # Verify that the invalid error is observed in the dbus error callback
1672        # message.
1673        invalid_intervals_error = (self.ERROR_INVALID_ADVERTISING_INTERVALS in
1674                                   self.advertising_msg)
1675
1676        # Verify that the min/max advertising intervals remain the same
1677        # after setting the invalid advertising intervals.
1678        #
1679        # In btmon log, we would see the following message first.
1680        #    bluetoothd: Set Advertising Intervals: 0x0010, 0x0010
1681        # And then, we should check if "Min advertising interval" and
1682        # "Max advertising interval" remain the same.
1683        start_str = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x' % (
1684                self.convert_to_adv_jiffies(invalid_min_adv_interval_ms),
1685                self.convert_to_adv_jiffies(invalid_max_adv_interval_ms))
1686
1687        search_strings = ['Min advertising interval:',
1688                          'Max advertising interval:']
1689        search_str = '|'.join(search_strings)
1690
1691        contents = self.bluetooth_le_facade.btmon_get(search_str=search_str,
1692                                                      start_str=start_str)
1693
1694        # The min/max advertising intervals of all advertisements should remain
1695        # the same as the previous valid ones.
1696        min_max_str = '[Min|Max] advertising interval: (\d*\.\d*) msec'
1697        min_max_pattern = re.compile(min_max_str)
1698        correct_orig_min_adv_interval = True
1699        correct_orig_max_adv_interval = True
1700        for line in contents:
1701            result = min_max_pattern.search(line)
1702            if result:
1703                interval = float(result.group(1))
1704                if 'Min' in line and interval != orig_min_adv_interval_ms:
1705                    correct_orig_min_adv_interval = False
1706                elif 'Max' in line and interval != orig_max_adv_interval_ms:
1707                    correct_orig_max_adv_interval = False
1708
1709        self.results = {
1710                'invalid_intervals_error': invalid_intervals_error,
1711                'correct_orig_min_adv_interval': correct_orig_min_adv_interval,
1712                'correct_orig_max_adv_interval': correct_orig_max_adv_interval}
1713
1714        return all(self.results.values())
1715
1716
1717    @_test_retry_and_log(False)
1718    def test_check_advertising_intervals(self, min_adv_interval_ms,
1719                                         max_adv_interval_ms):
1720        """Verify that the advertising intervals are as expected.
1721
1722        @param min_adv_interval_ms: the min advertising interval in ms.
1723        @param max_adv_interval_ms: the max advertising interval in ms.
1724
1725        @returns: True if the advertising intervals are correct.
1726                  False otherwise.
1727
1728        """
1729        self._get_btmon_log(None)
1730
1731        # Verify that the advertising intervals are correct.
1732        min_adv_interval_ms_found, max_adv_interval_ms_found = (
1733                self._verify_advertising_intervals(min_adv_interval_ms,
1734                                                   max_adv_interval_ms))
1735
1736        self.results = {
1737                'min_adv_interval_ms_found': min_adv_interval_ms_found,
1738                'max_adv_interval_ms_found': max_adv_interval_ms_found,
1739        }
1740        return all(self.results.values())
1741
1742
1743    @_test_retry_and_log(False)
1744    def test_reset_advertising(self, instance_ids=[]):
1745        """Verify that advertising is reset correctly.
1746
1747        Note that reset advertising would set advertising intervals to
1748        the default values. However, we would not be able to observe
1749        the values change until new advertisements are registered.
1750        Therefore, it is required that a test_register_advertisement()
1751        test is conducted after this test.
1752
1753        If instance_ids is [], all advertisements would still be removed
1754        if there are any. However, no need to check 'Advertising Removed'
1755        in btmon log since we may or may not be able to observe the message.
1756        This feature is needed if this test is invoked as the first one in
1757        a test case to reset advertising. In this situation, this test does
1758        not know how many advertisements exist.
1759
1760        @param instance_ids: the list of instance IDs that should be removed.
1761
1762        @returns: True if advertising is reset correctly.
1763                  False otherwise.
1764
1765        """
1766        self.count_advertisements = 0
1767        self._get_btmon_log(
1768                lambda: self.bluetooth_le_facade.reset_advertising())
1769
1770        # Verify that every advertisement is removed. When an advertisement
1771        # with instance id 1 is removed, the log looks like
1772        #   Advertising Removed
1773        #       instance: 1
1774        if len(instance_ids) > 0:
1775            advertisement_removed = self.bluetooth_le_facade.btmon_find(
1776                    'Advertising Removed')
1777            if advertisement_removed:
1778                for instance_id in instance_ids:
1779                    txt = 'Instance: %d' % instance_id
1780                    if not self.bluetooth_le_facade.btmon_find(txt):
1781                        advertisement_removed = False
1782                        break
1783        else:
1784            advertisement_removed = True
1785
1786        if not advertisement_removed:
1787            logging.error('Failed to remove advertisement')
1788
1789        # Verify that "Reset Advertising Intervals" command has been issued.
1790        reset_advertising_intervals = self.bluetooth_le_facade.btmon_find(
1791                'bluetoothd: Reset Advertising Intervals')
1792
1793        # Verify the advertising is disabled.
1794        advertising_disabled_observied = self.bluetooth_le_facade.btmon_find(
1795                'Advertising: Disabled')
1796        # If there are no existing advertisements, we may not observe
1797        # 'Advertising: Disabled'.
1798        advertising_disabled = (instance_ids == [] or
1799                                advertising_disabled_observied)
1800
1801        self.results = {
1802                'advertisement_removed': advertisement_removed,
1803                'reset_advertising_intervals': reset_advertising_intervals,
1804                'advertising_disabled': advertising_disabled,
1805        }
1806        return all(self.results.values())
1807
1808
1809    # -------------------------------------------------------------------
1810    # Bluetooth mouse related tests
1811    # -------------------------------------------------------------------
1812
1813
1814    def _record_input_events(self, device, gesture):
1815        """Record the input events.
1816
1817        @param device: the bluetooth HID device.
1818        @param gesture: the gesture method to perform.
1819
1820        @returns: the input events received on the DUT.
1821
1822        """
1823        self.input_facade.initialize_input_recorder(device.name)
1824        self.input_facade.start_input_recorder()
1825        time.sleep(self.HID_REPORT_SLEEP_SECS)
1826        gesture()
1827        time.sleep(self.HID_REPORT_SLEEP_SECS)
1828        self.input_facade.stop_input_recorder()
1829        time.sleep(self.HID_REPORT_SLEEP_SECS)
1830        event_values = self.input_facade.get_input_events()
1831        events = [Event(*ev) for ev in event_values]
1832        return events
1833
1834
1835    def _test_mouse_click(self, device, button):
1836        """Test that the mouse click events could be received correctly.
1837
1838        @param device: the meta device containing a bluetooth HID device
1839        @param button: which button to test, 'LEFT' or 'RIGHT'
1840
1841        @returns: True if the report received by the host matches the
1842                  expected one. False otherwise.
1843
1844        """
1845        if button == 'LEFT':
1846            gesture = device.LeftClick
1847        elif button == 'RIGHT':
1848            gesture = device.RightClick
1849        else:
1850            raise error.TestError('Button (%s) is not valid.' % button)
1851
1852        actual_events = self._record_input_events(device, gesture)
1853
1854        linux_input_button = {'LEFT': BTN_LEFT, 'RIGHT': BTN_RIGHT}
1855        expected_events = [
1856                # Button down
1857                recorder.MSC_SCAN_BTN_EVENT[button],
1858                Event(EV_KEY, linux_input_button[button], 1),
1859                recorder.SYN_EVENT,
1860                # Button up
1861                recorder.MSC_SCAN_BTN_EVENT[button],
1862                Event(EV_KEY, linux_input_button[button], 0),
1863                recorder.SYN_EVENT]
1864
1865        self.results = {
1866                'actual_events': map(str, actual_events),
1867                'expected_events': map(str, expected_events)}
1868        return actual_events == expected_events
1869
1870
1871    @_test_retry_and_log
1872    def test_mouse_left_click(self, device):
1873        """Test that the mouse left click events could be received correctly.
1874
1875        @param device: the meta device containing a bluetooth HID device
1876
1877        @returns: True if the report received by the host matches the
1878                  expected one. False otherwise.
1879
1880        """
1881        return self._test_mouse_click(device, 'LEFT')
1882
1883
1884    @_test_retry_and_log
1885    def test_mouse_right_click(self, device):
1886        """Test that the mouse right click events could be received correctly.
1887
1888        @param device: the meta device containing a bluetooth HID device
1889
1890        @returns: True if the report received by the host matches the
1891                  expected one. False otherwise.
1892
1893        """
1894        return self._test_mouse_click(device, 'RIGHT')
1895
1896
1897    def _test_mouse_move(self, device, delta_x=0, delta_y=0):
1898        """Test that the mouse move events could be received correctly.
1899
1900        @param device: the meta device containing a bluetooth HID device
1901        @param delta_x: the distance to move cursor in x axis
1902        @param delta_y: the distance to move cursor in y axis
1903
1904        @returns: True if the report received by the host matches the
1905                  expected one. False otherwise.
1906
1907        """
1908        gesture = lambda: device.Move(delta_x, delta_y)
1909        actual_events = self._record_input_events(device, gesture)
1910
1911        events_x = [Event(EV_REL, REL_X, delta_x)] if delta_x else []
1912        events_y = [Event(EV_REL, REL_Y, delta_y)] if delta_y else []
1913        expected_events = events_x + events_y + [recorder.SYN_EVENT]
1914
1915        self.results = {
1916                'actual_events': map(str, actual_events),
1917                'expected_events': map(str, expected_events)}
1918        return actual_events == expected_events
1919
1920
1921    @_test_retry_and_log
1922    def test_mouse_move_in_x(self, device, delta_x):
1923        """Test that the mouse move events in x could be received correctly.
1924
1925        @param device: the meta device containing a bluetooth HID device
1926        @param delta_x: the distance to move cursor in x axis
1927
1928        @returns: True if the report received by the host matches the
1929                  expected one. False otherwise.
1930
1931        """
1932        return self._test_mouse_move(device, delta_x=delta_x)
1933
1934
1935    @_test_retry_and_log
1936    def test_mouse_move_in_y(self, device, delta_y):
1937        """Test that the mouse move events in y could be received correctly.
1938
1939        @param device: the meta device containing a bluetooth HID device
1940        @param delta_y: the distance to move cursor in y axis
1941
1942        @returns: True if the report received by the host matches the
1943                  expected one. False otherwise.
1944
1945        """
1946        return self._test_mouse_move(device, delta_y=delta_y)
1947
1948
1949    @_test_retry_and_log
1950    def test_mouse_move_in_xy(self, device, delta_x, delta_y):
1951        """Test that the mouse move events could be received correctly.
1952
1953        @param device: the meta device containing a bluetooth HID device
1954        @param delta_x: the distance to move cursor in x axis
1955        @param delta_y: the distance to move cursor in y axis
1956
1957        @returns: True if the report received by the host matches the
1958                  expected one. False otherwise.
1959
1960        """
1961        return self._test_mouse_move(device, delta_x=delta_x, delta_y=delta_y)
1962
1963
1964    def _test_mouse_scroll(self, device, units):
1965        """Test that the mouse wheel events could be received correctly.
1966
1967        @param device: the meta device containing a bluetooth HID device
1968        @param units: the units to scroll in y axis
1969
1970        @returns: True if the report received by the host matches the
1971                  expected one. False otherwise.
1972
1973        """
1974        gesture = lambda: device.Scroll(units)
1975        actual_events = self._record_input_events(device, gesture)
1976        expected_events = [Event(EV_REL, REL_WHEEL, units), recorder.SYN_EVENT]
1977        self.results = {
1978                'actual_events': map(str, actual_events),
1979                'expected_events': map(str, expected_events)}
1980        return actual_events == expected_events
1981
1982
1983    @_test_retry_and_log
1984    def test_mouse_scroll_down(self, device, delta_y):
1985        """Test that the mouse wheel events could be received correctly.
1986
1987        @param device: the meta device containing a bluetooth HID device
1988        @param delta_y: the units to scroll down in y axis;
1989                        should be a postive value
1990
1991        @returns: True if the report received by the host matches the
1992                  expected one. False otherwise.
1993
1994        """
1995        if delta_y > 0:
1996            return self._test_mouse_scroll(device, delta_y)
1997        else:
1998            raise error.TestError('delta_y (%d) should be a positive value',
1999                                  delta_y)
2000
2001
2002    @_test_retry_and_log
2003    def test_mouse_scroll_up(self, device, delta_y):
2004        """Test that the mouse wheel events could be received correctly.
2005
2006        @param device: the meta device containing a bluetooth HID device
2007        @param delta_y: the units to scroll up in y axis;
2008                        should be a postive value
2009
2010        @returns: True if the report received by the host matches the
2011                  expected one. False otherwise.
2012
2013        """
2014        if delta_y > 0:
2015            return self._test_mouse_scroll(device, -delta_y)
2016        else:
2017            raise error.TestError('delta_y (%d) should be a positive value',
2018                                  delta_y)
2019
2020
2021    @_test_retry_and_log
2022    def test_mouse_click_and_drag(self, device, delta_x, delta_y):
2023        """Test that the mouse click-and-drag events could be received
2024        correctly.
2025
2026        @param device: the meta device containing a bluetooth HID device
2027        @param delta_x: the distance to drag in x axis
2028        @param delta_y: the distance to drag in y axis
2029
2030        @returns: True if the report received by the host matches the
2031                  expected one. False otherwise.
2032
2033        """
2034        gesture = lambda: device.ClickAndDrag(delta_x, delta_y)
2035        actual_events = self._record_input_events(device, gesture)
2036
2037        button = 'LEFT'
2038        expected_events = (
2039                [# Button down
2040                 recorder.MSC_SCAN_BTN_EVENT[button],
2041                 Event(EV_KEY, BTN_LEFT, 1),
2042                 recorder.SYN_EVENT] +
2043                # cursor movement in x and y
2044                ([Event(EV_REL, REL_X, delta_x)] if delta_x else []) +
2045                ([Event(EV_REL, REL_Y, delta_y)] if delta_y else []) +
2046                [recorder.SYN_EVENT] +
2047                # Button up
2048                [recorder.MSC_SCAN_BTN_EVENT[button],
2049                 Event(EV_KEY, BTN_LEFT, 0),
2050                 recorder.SYN_EVENT])
2051
2052        self.results = {
2053                'actual_events': map(str, actual_events),
2054                'expected_events': map(str, expected_events)}
2055        return actual_events == expected_events
2056
2057
2058    # -------------------------------------------------------------------
2059    # Autotest methods
2060    # -------------------------------------------------------------------
2061
2062
2063    def initialize(self):
2064        """Initialize bluetooth adapter tests."""
2065        # Run through every tests and collect failed tests in self.fails.
2066        self.fails = []
2067
2068        # If a test depends on multiple conditions, write the results of
2069        # the conditions in self.results so that it is easy to know
2070        # what conditions failed by looking at the log.
2071        self.results = None
2072
2073        # Some tests may instantiate a peripheral device for testing.
2074        self.devices = dict()
2075        for device_type in SUPPORTED_DEVICE_TYPES:
2076            self.devices[device_type] = None
2077
2078        # The count of registered advertisements.
2079        self.count_advertisements = 0
2080
2081
2082    def check_chameleon(self):
2083        """Check the existence of chameleon_host.
2084
2085        The chameleon_host is specified in --args as follows
2086
2087        (cr) $ test_that --args "chameleon_host=$CHAMELEON_IP" "$DUT_IP" <test>
2088
2089        """
2090        logging.debug('labels: %s', self.host.get_labels())
2091        if self.host.chameleon is None:
2092            raise error.TestError('Have to specify chameleon_host IP.')
2093
2094
2095    def run_once(self, *args, **kwargs):
2096        """This method should be implemented by children classes.
2097
2098        Typically, the run_once() method would look like:
2099
2100        factory = remote_facade_factory.RemoteFacadeFactory(host)
2101        self.bluetooth_facade = factory.create_bluetooth_hid_facade()
2102
2103        self.test_bluetoothd_running()
2104        # ...
2105        # invoke more self.test_xxx() tests.
2106        # ...
2107
2108        if self.fails:
2109            raise error.TestFail(self.fails)
2110
2111        """
2112        raise NotImplementedError
2113
2114
2115    def cleanup(self):
2116        """Clean up bluetooth adapter tests."""
2117        # Close the device properly if a device is instantiated.
2118        # Note: do not write something like the following statements
2119        #           if self.devices[device_type]:
2120        #       or
2121        #           if bool(self.devices[device_type]):
2122        #       Otherwise, it would try to invoke bluetooth_mouse.__nonzero__()
2123        #       which just does not exist.
2124        for device_type in SUPPORTED_DEVICE_TYPES:
2125            if self.devices[device_type] is not None:
2126                self.devices[device_type].Close()
2127