• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Server side bluetooth adapter subtests."""
6
7import errno
8import functools
9import httplib
10import inspect
11import logging
12import os
13import re
14from socket import error as SocketError
15import time
16
17import bluetooth_test_utils
18
19from autotest_lib.client.bin import utils
20from autotest_lib.client.bin.input import input_event_recorder as recorder
21from autotest_lib.client.common_lib import error
22from autotest_lib.client.common_lib.cros.bluetooth import bluetooth_socket
23from autotest_lib.client.cros.chameleon import chameleon
24from autotest_lib.server import test
25
26from autotest_lib.client.bin.input.linux_input import (
27        BTN_LEFT, BTN_RIGHT, EV_KEY, EV_REL, REL_X, REL_Y, REL_WHEEL)
28from autotest_lib.server.cros.bluetooth.bluetooth_gatt_client_utils import (
29        GATT_ClientFacade, GATT_Application, GATT_HIDApplication)
30from autotest_lib.server.cros.multimedia import remote_facade_factory
31
32
33Event = recorder.Event
34
35# Location of data traces relative to this (bluetooth_adapter_tests.py) file
36BT_ADAPTER_TEST_PATH = os.path.dirname(__file__)
37TRACE_LOCATION = os.path.join(BT_ADAPTER_TEST_PATH, 'input_traces/keyboard')
38
39# Delay binding the methods since host is only available at run time.
40SUPPORTED_DEVICE_TYPES = {
41    'MOUSE': lambda chameleon: chameleon.get_bluetooth_hid_mouse,
42    'KEYBOARD': lambda chameleon: chameleon.get_bluetooth_hid_keyboard,
43    'BLE_MOUSE': lambda chameleon: chameleon.get_ble_mouse,
44    'BLE_KEYBOARD': lambda chameleon: chameleon.get_ble_keyboard,
45    'A2DP_SINK': lambda chameleon: chameleon.get_bluetooth_a2dp_sink,
46
47    # This is a base object that does not emulate any Bluetooth device.
48    # This object is preferred when only a pure XMLRPC server is needed
49    # on the chameleon host, e.g., to perform servod methods.
50    'BLUETOOTH_BASE': lambda chameleon: chameleon.get_bluetooth_base,
51}
52
53
54def method_name():
55    """Get the method name of a class.
56
57    This function is supposed to be invoked inside a class and will
58    return current method name who invokes this function.
59
60    @returns: the string of the method name inside the class.
61    """
62    return inspect.getouterframes(inspect.currentframe())[1][3]
63
64
65def _run_method(method, method_name, *args, **kwargs):
66    """Run a target method and capture exceptions if any.
67
68    This is just a wrapper of the target method so that we do not need to
69    write the exception capturing structure repeatedly. The method could
70    be either a device method or a facade method.
71
72    @param method: the method to run
73    @param method_name: the name of the method
74
75    @returns: the return value of target method() if successful.
76              False otherwise.
77
78    """
79    result = False
80    try:
81        result = method(*args, **kwargs)
82    except Exception as e:
83        logging.error('%s: %s', method_name, e)
84    except:
85        logging.error('%s: unexpected error', method_name)
86    return result
87
88
89def get_bluetooth_emulated_device(chameleon, device_type):
90    """Get the bluetooth emulated device object.
91
92    @param chameleon: the chameleon host
93    @param device_type : the bluetooth device type, e.g., 'MOUSE'
94
95    @returns: the bluetooth device object
96
97    """
98
99    def _retry_device_method(method_name, legal_falsy_values=[]):
100        """retry the emulated device's method.
101
102        The method is invoked as device.xxxx() e.g., device.GetAdvertisedName().
103
104        Note that the method name string is provided to get the device's actual
105        method object at run time through getattr(). The rebinding is required
106        because a new device may have been created previously or during the
107        execution of fix_serial_device().
108
109        Given a device's method, it is not feasible to get the method name
110        through __name__ attribute. This limitation is due to the fact that
111        the device is a dotted object of an XML RPC server proxy.
112        As an example, with the method name 'GetAdvertisedName', we could
113        derive the correspoinding method device.GetAdvertisedName. On the
114        contrary, given device.GetAdvertisedName, it is not feasible to get the
115        method name by device.GetAdvertisedName.__name__
116
117        Also note that if the device method fails, we would try remediation
118        step and retry the device method. The remediation steps are
119         1) re-creating the serial device.
120         2) reset (powercycle) the bluetooth dongle.
121         3) reboot chameleond host.
122        If the device method still fails after these steps, we fail the test
123
124        The default values exist for uses of this function before the options
125        were added, ideally we should change zero_ok to False.
126
127        @param method_name: the string of the method name.
128        @param legal_falsy_values: Values that are falsy but might be OK.
129
130        @returns: the result returned by the device's method if the call was
131                  successful
132
133        @raises: TestError if the devices's method fails or if repair of
134                 peripheral kit fails
135
136        """
137
138        action_table = [('recreate' , 'Fixing the serial device'),
139                        ('reset', 'Power cycle the peer device'),
140                        ('reboot', 'Reboot the chamleond host')]
141
142        for i, (action, description) in enumerate(action_table):
143            logging.info('Attempt %s : %s ', i+1, method_name)
144
145            result = _run_method(getattr(device, method_name), method_name)
146            if _is_successful(result, legal_falsy_values):
147                return result
148
149            logging.error('%s failed the %s time. Attempting to %s',
150                          method_name,i,description)
151            if not fix_serial_device(chameleon, device, action):
152                logging.info('%s failed', description)
153            else:
154                logging.info('%s successful', description)
155
156        #try it last time after fix it by last action
157        result = _run_method(getattr(device, method_name), method_name)
158        if _is_successful(result, legal_falsy_values):
159            return result
160
161        raise error.TestError('Failed to execute %s. Bluetooth peer device is'
162                              'not working' % method_name)
163
164
165    if device_type not in SUPPORTED_DEVICE_TYPES:
166        raise error.TestError('The device type is not supported: %s',
167                              device_type)
168
169    # Get the bluetooth device object and query some important properties.
170    device = SUPPORTED_DEVICE_TYPES[device_type](chameleon)()
171
172    # Get some properties of the kit
173    # NOTE: Strings updated here must be kept in sync with Chameleon.
174    device._capabilities = _retry_device_method('GetCapabilities')
175    device._transports = device._capabilities["CAP_TRANSPORTS"]
176    device._is_le_only = ("TRANSPORT_LE" in device._transports and
177                          len(device._transports) == 1)
178    device._has_pin = device._capabilities["CAP_HAS_PIN"]
179    device.can_init_connection = device._capabilities["CAP_INIT_CONNECT"]
180
181    _retry_device_method('Init')
182    logging.info('device type: %s', device_type)
183
184    device.name = _retry_device_method('GetAdvertisedName')
185    logging.info('device name: %s', device.name)
186
187    device.address = _retry_device_method('GetLocalBluetoothAddress')
188    logging.info('address: %s', device.address)
189
190    pin_falsy_values = [] if device._has_pin else [None]
191    device.pin = _retry_device_method('GetPinCode', pin_falsy_values)
192    logging.info('pin: %s', device.pin)
193
194    class_falsy_values = [None] if device._is_le_only else [0]
195
196    # Class of service is None for LE-only devices. Don't fail or parse it.
197    device.class_of_service = _retry_device_method('GetClassOfService',
198                                                   class_falsy_values)
199    if device._is_le_only:
200      parsed_class_of_service = device.class_of_service
201    else:
202      parsed_class_of_service = "0x%04X" % device.class_of_service
203    logging.info('class of service: %s', parsed_class_of_service)
204
205    device.class_of_device = _retry_device_method('GetClassOfDevice',
206                                                  class_falsy_values)
207    # Class of device is None for LE-only devices. Don't fail or parse it.
208    if device._is_le_only:
209      parsed_class_of_device = device.class_of_device
210    else:
211      parsed_class_of_device = "0x%04X" % device.class_of_device
212    logging.info('class of device: %s', parsed_class_of_device)
213
214    device.device_type = _retry_device_method('GetHIDDeviceType')
215    logging.info('device type: %s', device.device_type)
216
217    device.authentication_mode = None
218    if not device._is_le_only:
219      device.authentication_mode = _retry_device_method('GetAuthenticationMode')
220      logging.info('authentication mode: %s', device.authentication_mode)
221
222    device.port = _retry_device_method('GetPort')
223    logging.info('serial port: %s\n', device.port)
224
225    return device
226
227
228def recreate_serial_device(device):
229    """Create and connect to a new serial device.
230
231    @param device: the bluetooth HID device
232
233    @returns: True if the serial device is re-created successfully.
234
235    """
236    logging.info('Remove the old serial device and create a new one.')
237    if device is not None:
238        try:
239            device.Close()
240        except:
241            logging.error('failed to close the serial device.')
242            return False
243    try:
244        device.CreateSerialDevice()
245        return True
246    except:
247        logging.error('failed to invoke CreateSerialDevice.')
248        return False
249
250
251def _check_device_init(device, operation):
252    # Check if the serial device could initialize, connect, and
253    # enter command mode correctly.
254    logging.info('Checking device status...')
255    if not _run_method(device.Init, 'Init'):
256        logging.info('device.Init: failed after %s', operation)
257        return False
258    if not device.CheckSerialConnection():
259        logging.info('device.CheckSerialConnection: failed after %s', operation)
260        return False
261    if not _run_method(device.EnterCommandMode, 'EnterCommandMode'):
262        logging.info('device.EnterCommandMode: failed after %s', operation)
263        return False
264    logging.info('The device is created successfully after %s.', operation)
265    return True
266
267def _reboot_chameleon(chameleon, device):
268    """ Reboot chameleond host
269
270    Also power cycle the device since reboot may not do that.."""
271
272    # Chameleond fizz hosts should have write protect removed and
273    # set_gbb_flags set to 0 to minimize boot time
274    REBOOT_SLEEP_SECS = 10
275    RESET_SLEEP_SECS = 1
276
277    # Close the bluetooth peripheral device and reboot the chameleon board.
278    device.Close()
279    logging.info("Powercycling the device")
280    device.PowerCycle()
281    time.sleep(RESET_SLEEP_SECS)
282    logging.info('rebooting chameleon...')
283    chameleon.reboot()
284
285    # Every chameleon reboot would take a bit more than REBOOT_SLEEP_SECS.
286    # Sleep REBOOT_SLEEP_SECS and then begin probing the chameleon board.
287    time.sleep(REBOOT_SLEEP_SECS)
288    return _check_device_init(device, 'reboot')
289
290def _reset_device_power(device):
291    """Power cycle the device."""
292    RESET_SLEEP_SECS = 1
293    try:
294        if not device.PowerCycle():
295            logging.info('device.PowerCycle() failed')
296            return False
297    except:
298        logging.error('exception in device.PowerCycle')
299    else:
300        logging.info('device powercycled')
301    time.sleep(RESET_SLEEP_SECS)
302    return _check_device_init(device, 'reset')
303
304def _is_successful(result, legal_falsy_values=[]):
305    """Is the method result considered successful?
306
307    Some method results, for example that of class_of_service, may be 0 which is
308    considered a valid result. Occassionally, None is acceptable.
309
310    The default values exist for uses of this function before the options were
311    added, ideally we should change zero_ok to False.
312
313    @param result: a method result
314    @param legal_falsy_values: Values that are falsy but might be OK.
315
316    @returns: True if bool(result) is True, or if result is 0 and zero_ok, or if
317              result is None and none_ok.
318    """
319    truthiness_of_result = bool(result)
320    return truthiness_of_result or result in legal_falsy_values
321
322
323def fix_serial_device(chameleon, device, operation='reset'):
324    """Fix the serial device.
325
326    This function tries to fix the serial device by
327    (1) re-creating a serial device, or
328    (2) power cycling the usb port to which device is connected
329    (3) rebooting the chameleon board.
330
331    Argument operation determine which of the steps above are perform
332
333    Note that rebooting the chameleon board or reseting the device will remove
334    the state on the peripheral which might cause test failures. Please use
335    reset/reboot only before or after a test.
336
337    @param chameleon: the chameleon host
338    @param device: the bluetooth device.
339    @param operation: Recovery operation to perform 'recreate/reset/reboot'
340
341    @returns: True if the serial device is fixed. False otherwise.
342
343    """
344
345    if operation == 'recreate':
346        # Check the serial connection. Fix it if needed.
347        if device.CheckSerialConnection():
348            # The USB serial connection still exists.
349            # Re-connection suffices to solve the problem. The problem
350            # is usually caused by serial port change. For example,
351            # the serial port changed from /dev/ttyUSB0 to /dev/ttyUSB1.
352            logging.info('retry: creating a new serial device...')
353            return recreate_serial_device(device)
354        else:
355            # Recreate the bluetooth peer device
356            return _check_device_init(device, operation)
357
358    elif operation == 'reset':
359        # Powercycle the USB port where the bluetooth peer device is connected.
360        # RN-42 and RN-52 share the same vid:pid so both will be powercycled.
361        # This will only work on fizz host with write protection removed.
362        # Note that the state on the device will be lost.
363        return _reset_device_power(device)
364
365    elif operation == 'reboot':
366        # Reboot the chameleon host.
367        # The device is power cycled before rebooting chameleon host
368        return _reboot_chameleon(chameleon, device)
369
370    else:
371        logging.error('fix_serial_device Invalid operation %s', operation)
372        return False
373
374
375def retry(test_method, instance, *args, **kwargs):
376    """Execute the target facade test_method(). Retry if failing the first time.
377
378    A test_method is something like self.test_xxxx() in BluetoothAdapterTests,
379    e.g., BluetoothAdapterTests.test_bluetoothd_running().
380
381    @param test_method: the test method to retry
382
383    @returns: True if the return value of test_method() is successful.
384              False otherwise.
385
386    """
387    if _is_successful(_run_method(test_method, test_method.__name__,
388                                  instance, *args, **kwargs)):
389        return True
390
391    # Try to fix the serial device if applicable.
392    logging.error('%s failed at the 1st time: (%s)', test_method.__name__,
393                  str(instance.results))
394
395    # If this test does not use any attached serial device, just re-run
396    # the test.
397    logging.info('%s: retry the 2nd time.', test_method.__name__)
398    time.sleep(1)
399
400
401    if not hasattr(instance, 'use_chameleon'):
402        return _is_successful(_run_method(test_method, test_method.__name__,
403                                          instance, *args, **kwargs))
404    for device_type in SUPPORTED_DEVICE_TYPES:
405        for device in getattr(instance, 'devices')[device_type]:
406            #fix_serial_device in 'recreate' mode doesn't require chameleon
407            #so just pass None for convinent.
408            if not fix_serial_device(None, device, "recreate"):
409                return False
410
411    logging.info('%s: retry the 2nd time.', test_method.__name__)
412    return _is_successful(_run_method(test_method, test_method.__name__,
413                                      instance, *args, **kwargs))
414
415
416def _test_retry_and_log(test_method_or_retry_flag):
417    """A decorator that logs test results, collects error messages, and retries
418       on request.
419
420    @param test_method_or_retry_flag: either the test_method or a retry_flag.
421        There are some possibilities of this argument:
422        1. the test_method to conduct and retry: should retry the test_method.
423            This occurs with
424            @_test_retry_and_log
425        2. the retry flag is True. Should retry the test_method.
426            This occurs with
427            @_test_retry_and_log(True)
428        3. the retry flag is False. Do not retry the test_method.
429            This occurs with
430            @_test_retry_and_log(False)
431
432    @returns: a wrapper of the test_method with test log. The retry mechanism
433        would depend on the retry flag.
434
435    """
436
437    def decorator(test_method):
438        """A decorator wrapper of the decorated test_method.
439
440        @param test_method: the test method being decorated.
441
442        @returns the wrapper of the test method.
443
444        """
445        @functools.wraps(test_method)
446        def wrapper(instance, *args, **kwargs):
447            """A wrapper of the decorated method.
448
449            @param instance: an BluetoothAdapterTests instance
450
451            @returns the result of the test method
452
453            """
454            instance.results = None
455            if callable(test_method_or_retry_flag) or test_method_or_retry_flag:
456                test_result = retry(test_method, instance, *args, **kwargs)
457            else:
458                test_result = test_method(instance, *args, **kwargs)
459
460            if test_result:
461                logging.info('[*** passed: %s]', test_method.__name__)
462            else:
463                fail_msg = '[--- failed: %s (%s)]' % (test_method.__name__,
464                                                      str(instance.results))
465                logging.error(fail_msg)
466                instance.fails.append(fail_msg)
467            return test_result
468        return wrapper
469
470    if callable(test_method_or_retry_flag):
471        # If the decorator function comes with no argument like
472        # @_test_retry_and_log
473        return decorator(test_method_or_retry_flag)
474    else:
475        # If the decorator function comes with an argument like
476        # @_test_retry_and_log(False)
477        return decorator
478
479
480def test_case_log(method):
481    """A decorator for test case methods.
482
483    The main purpose of this decorator is to display the test case name
484    in the test log which looks like
485
486        <... test_case_RA3_CD_SI200_CD_PC_CD_UA3 ...>
487
488    @param method: the test case method to decorate.
489
490    @returns: a wrapper function of the decorated method.
491
492    """
493    @functools.wraps(method)
494    def wrapper(instance, *args, **kwargs):
495        """Log the name of the wrapped method before execution"""
496        logging.info('\n<... %s ...>', method.__name__)
497        method(instance, *args, **kwargs)
498    return wrapper
499
500
501class BluetoothAdapterTests(test.test):
502    """Server side bluetooth adapter tests.
503
504    This test class tries to thoroughly verify most of the important work
505    states of a bluetooth adapter.
506
507    The various test methods are supposed to be invoked by actual autotest
508    tests such as server/cros/site_tests/bluetooth_Adapter*.
509
510    """
511    version = 1
512    ADAPTER_ACTION_SLEEP_SECS = 1
513    ADAPTER_PAIRING_TIMEOUT_SECS = 60
514    ADAPTER_CONNECTION_TIMEOUT_SECS = 30
515    ADAPTER_DISCONNECTION_TIMEOUT_SECS = 30
516    ADAPTER_PAIRING_POLLING_SLEEP_SECS = 3
517    ADAPTER_DISCOVER_TIMEOUT_SECS = 60          # 30 seconds too short sometimes
518    ADAPTER_DISCOVER_POLLING_SLEEP_SECS = 1
519    ADAPTER_DISCOVER_NAME_TIMEOUT_SECS = 30
520
521    ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS = 10
522    ADAPTER_POLLING_DEFAULT_SLEEP_SECS = 1
523
524    HID_REPORT_SLEEP_SECS = 1
525
526
527    DEFAULT_START_DELAY_SECS = 0
528    DEFAULT_HOLD_INTERVAL_SECS = 10
529    DEFAULT_HOLD_TIMEOUT_SECS = 60
530    DEFAULT_HOLD_SLEEP_SECS = 1
531
532    # Default suspend time in seconds for suspend resume.
533    SUSPEND_TIME_SECS=10
534
535    # hci0 is the default hci device if there is no external bluetooth dongle.
536    EXPECTED_HCI = 'hci0'
537
538    CLASS_OF_SERVICE_MASK = 0xFFE000
539    CLASS_OF_DEVICE_MASK = 0x001FFF
540
541    # Constants about advertising.
542    DAFAULT_MIN_ADVERTISEMENT_INTERVAL_MS = 1280
543    DAFAULT_MAX_ADVERTISEMENT_INTERVAL_MS = 1280
544    ADVERTISING_INTERVAL_UNIT = 0.625
545
546    # Error messages about advertising dbus methods.
547    ERROR_FAILED_TO_REGISTER_ADVERTISEMENT = (
548            'org.bluez.Error.Failed: Failed to register advertisement')
549    ERROR_INVALID_ADVERTISING_INTERVALS = (
550            'org.bluez.Error.InvalidArguments: Invalid arguments')
551
552    # Supported profiles by chrome os.
553    SUPPORTED_UUIDS = {
554            'HSP_AG_UUID': '00001112-0000-1000-8000-00805f9b34fb',
555            'GATT_UUID': '00001801-0000-1000-8000-00805f9b34fb',
556            'A2DP_SOURCE_UUID': '0000110a-0000-1000-8000-00805f9b34fb',
557            'HFP_AG_UUID': '0000111f-0000-1000-8000-00805f9b34fb',
558            'PNP_UUID': '00001200-0000-1000-8000-00805f9b34fb',
559            'GAP_UUID': '00001800-0000-1000-8000-00805f9b34fb'}
560
561    # Board list for name/ID test check. These devices don't need to be tested
562    REFERENCE_BOARDS = ['rambi', 'nyan', 'oak', 'reef', 'yorp', 'bip']
563
564    # Path for btmon logs
565    BTMON_DIR_LOG_PATH = '/var/log/btmon'
566
567    def group_chameleons_type(self):
568        """Group all chameleons by the type of their detected device."""
569
570        # Use previously created chameleon_group instead of creating new
571        if len(self.chameleon_group_copy) > 0:
572            logging.info('Using previously created chameleon group')
573            for device_type in SUPPORTED_DEVICE_TYPES:
574                self.chameleon_group[device_type] = \
575                    self.chameleon_group_copy[device_type][:]
576            return
577
578        # Create new chameleon_group
579        for device_type in SUPPORTED_DEVICE_TYPES:
580            self.chameleon_group[device_type] = list()
581            # Create copy of chameleon_group
582            self.chameleon_group_copy[device_type] = list()
583
584        for idx,chameleon in enumerate(self.host.chameleon_list):
585            for device_type,gen_device_func in SUPPORTED_DEVICE_TYPES.items():
586                try:
587                    device = gen_device_func(chameleon)()
588                    if device.CheckSerialConnection():
589                        self.chameleon_group[device_type].append(chameleon)
590                        logging.info('%d-th chameleon find device %s', \
591                                     idx, device_type)
592                        # Create copy of chameleon_group
593                        self.chameleon_group_copy[device_type].append(chameleon)
594                except:
595                    logging.debug('Error with initializing %s on %d-th'
596                                  'chameleon', device_type, idx)
597            if len(self.chameleon_group[device_type]) == 0:
598                logging.error('No device is detected on %d-th chameleon', idx)
599
600
601    def wait_for_device(self, device):
602        """Waits for device to become available again
603
604        We reset raspberry pi peer between tests. This method helps us wait to
605        prevent us from trying to use the device before it comes back up again.
606
607        @param device: proxy object of peripheral device
608        """
609
610        def is_device_ready():
611            """Tries to use a service of the device
612
613            @returns: True if device is available to provide service
614                      False otherwise
615            """
616
617            try:
618                # Call a simple (fast) function to determine if device is online
619                # and reachable. If we can query this property, we know the
620                # device is available for us to use
621                getattr(device, 'GetCapabilities')()
622
623            except Exception as e:
624                return False
625
626            return True
627
628
629        try:
630            utils.poll_for_condition(condition=is_device_ready,
631                                     desc='wait_for_device')
632
633        except utils.TimeoutError as e:
634            raise error.TestError('Peer is not available after waiting')
635
636
637    def clear_raspi_device(self, device):
638        """Clears a device on a raspi chameleon by resetting bluetooth stack
639
640        @param device: proxy object of peripheral device
641        """
642
643        try:
644            device.ResetStack()
645
646        except SocketError as e:
647            # Ignore conn reset, expected during stack reset
648            if e.errno != errno.ECONNRESET:
649                raise
650
651        except chameleon.ChameleonConnectionError as e:
652            # Ignore chameleon conn reset, expected during stack reset
653            if str(errno.ECONNRESET) not in str(e):
654                raise
655
656        except httplib.BadStatusLine as e:
657            # BadStatusLine occurs occasionally when chameleon
658            # is restarted. We ignore it here
659            logging.error('Ignoring badstatusline exception')
660            pass
661
662        # Catch generic Fault exception by rpc server, ignore
663        # method not available as it indicates platform didn't
664        # support method and that's ok
665        except Exception, e:
666            if not (e.__class__.__name__ == 'Fault' and
667                'is not supported' in str(e)):
668                raise
669
670        # Ensure device is back online before continuing
671        self.wait_for_device(device)
672
673
674    def get_device_rasp(self, device_num, on_start=True):
675        """Get all bluetooth device objects from chameleons.
676        This method should be called only after group_chameleons_type
677        @param device_num : dict of {device_type:number}, to specify the number
678                            of device needed for each device_type.
679
680        @param on_start: boolean describing whether the requested clear is for a
681                            new test, or in the middle of a current one
682
683        @returns: True if Success.
684        """
685
686        for device_type, number in device_num.items():
687            if len(self.chameleon_group[device_type]) < number:
688                logging.error('Number of chameleon with device type'
689                      '%s is %d, which is less then needed %d', device_type,
690                      len(self.chameleon_group[device_type]), number)
691                return False
692
693            for chameleon in self.chameleon_group[device_type][:number]:
694                device = get_bluetooth_emulated_device(chameleon, device_type)
695
696                # Re-fresh device to clean state if test is starting
697                if on_start:
698                    self.clear_raspi_device(device)
699
700                try:
701                    # Tell generic chameleon to bind to this device type
702                    device.SpecifyDeviceType(device_type)
703
704                # Catch generic Fault exception by rpc server, ignore method not
705                # available as it indicates platform didn't support method and
706                # that's ok
707                except Exception, e:
708                    if not (e.__class__.__name__ == 'Fault' and
709                        'is not supported' in str(e)):
710                        raise
711
712                self.devices[device_type].append(device)
713
714                # Remove this chameleon from chameleon_group since it is already
715                # configured as a specific device
716                for temp_device in SUPPORTED_DEVICE_TYPES:
717                    if chameleon in self.chameleon_group[temp_device]:
718                        self.chameleon_group[temp_device].remove(chameleon)
719
720        return True
721
722
723    def get_device(self, device_type, on_start=True):
724        """Get the bluetooth device object.
725
726        @param device_type : the bluetooth device type, e.g., 'MOUSE'
727
728        @param on_start: boolean describing whether the requested clear is for a
729                            new test, or in the middle of a current one
730
731        @returns: the bluetooth device object
732
733        """
734        self.devices[device_type].append(get_bluetooth_emulated_device(\
735                                    self.host.chameleon, device_type))
736
737        # Re-fresh device to clean state if test is starting
738        if on_start:
739            self.clear_raspi_device(self.devices[device_type][-1])
740
741        try:
742            # Tell generic chameleon to bind to this device type
743            self.devices[device_type][-1].SpecifyDeviceType(device_type)
744
745        # Catch generic Fault exception by rpc server, ignore method not
746        # available as it indicates platform didn't support method and that's
747        # ok
748        except Exception, e:
749            if not (e.__class__.__name__ == 'Fault' and
750                'is not supported' in str(e)):
751                raise
752
753        return self.devices[device_type][-1]
754
755
756    def is_device_available(self, chameleon, device_type):
757        """Determines if the named device is available on the linked chameleon
758
759        @param device_type: the bluetooth HID device type, e.g., 'MOUSE'
760
761        @returns: True if it is able to resolve the device, false otherwise
762        """
763
764        device = SUPPORTED_DEVICE_TYPES[device_type](chameleon)()
765        try:
766            # The proxy prevents us from checking if the object is None directly
767            # so instead we call a fast method that any peripheral must support.
768            # This will fail if the object over the proxy doesn't exist
769            getattr(device, 'GetCapabilities')()
770
771        except Exception as e:
772            return False
773
774        return True
775
776
777    def list_devices_available(self):
778        """Queries which devices are available on chameleon/s
779
780        @returns: dict mapping HID device types to number of supporting peers
781                  available, e.g. {'MOUSE':1, 'KEYBOARD':1}
782        """
783        devices_available = {}
784        for device_type in SUPPORTED_DEVICE_TYPES:
785            for chameleon in self.host.chameleon_list:
786                if self.is_device_available(chameleon, device_type):
787                    devices_available[device_type] = \
788                        devices_available.get(device_type, 0) + 1
789
790        return devices_available
791
792
793    def suspend_resume(self, suspend_time=SUSPEND_TIME_SECS):
794        """Suspend the DUT for a while and then resume.
795
796        @param suspend_time: the suspend time in secs
797
798        """
799        logging.info('The DUT suspends for %d seconds...', suspend_time)
800        try:
801            self.host.suspend(suspend_time=suspend_time)
802        except error.AutoservSuspendError:
803            logging.error('The DUT did not suspend for %d seconds', suspend_time)
804            pass
805        logging.info('The DUT is waken up.')
806
807
808    def reboot(self):
809        """Reboot the DUT and recreate necessary processes and variables"""
810        self.host.reboot()
811
812        # We need to recreate the bluetooth_facade after a reboot.
813        # Delete the proxy first so it won't delete the old one, which
814        # invokes disconnection, after creating the new one.
815        del self.factory
816        del self.bluetooth_facade
817        del self.input_facade
818        self.factory = remote_facade_factory.RemoteFacadeFactory(self.host,
819                       disable_arc=True)
820        self.bluetooth_facade = self.factory.create_bluetooth_hid_facade()
821        self.input_facade = self.factory.create_input_facade()
822
823        # Re-enable debugging verbose since Chrome will set it to
824        # default(disable).
825        self.enable_disable_debug_log(enable=True)
826
827        self.start_new_btmon()
828
829
830    def _wait_till_condition_holds(self, func, method_name,
831                                   timeout=DEFAULT_HOLD_TIMEOUT_SECS,
832                                   sleep_interval=DEFAULT_HOLD_SLEEP_SECS,
833                                   hold_interval=DEFAULT_HOLD_INTERVAL_SECS,
834                                   start_delay=DEFAULT_START_DELAY_SECS):
835        """ Wait for the func() to hold true for a period of time
836
837
838        @param func: the function to wait for.
839        @param method_name: the invoking class method.
840        @param timeout: number of seconds to wait before giving up.
841        @param sleep_interval: the interval in seconds to sleep between
842                invoking func().
843        @param hold_interval: the interval in seconds for the condition to
844                             remain true
845        @param start_delay: interval in seconds to wait before starting
846
847        @returns: True if the condition is met,
848                  False otherwise
849
850        """
851        if start_delay > 0:
852            logging.debug('waiting for %s secs before checking %s',start_delay,
853                          method_name)
854            time.sleep(start_delay)
855
856        try:
857            utils.poll_till_condition_holds(condition=func,
858                                            timeout=timeout,
859                                            sleep_interval=sleep_interval,
860                                            hold_interval = hold_interval,
861                                            desc=('Waiting %s' % method_name))
862            return True
863        except utils.TimeoutError as e:
864            logging.error('%s: %s', method_name, e)
865        except Exception as e:
866            logging.error('%s: %s', method_name, e)
867            err = 'bluetoothd possibly crashed. Check out /var/log/messages.'
868            logging.error(err)
869        except:
870            logging.error('%s: unexpected error', method_name)
871        return False
872
873
874    def _wait_for_condition(self, func, method_name,
875                            timeout=ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS,
876                            sleep_interval=ADAPTER_POLLING_DEFAULT_SLEEP_SECS,
877                            start_delay=DEFAULT_START_DELAY_SECS):
878        """Wait for the func() to become True.
879
880        @param func: the function to wait for.
881        @param method_name: the invoking class method.
882        @param timeout: number of seconds to wait before giving up.
883        @param sleep_interval: the interval in seconds to sleep between
884                invoking func().
885        @param start_delay: interval in seconds to wait before starting
886
887        @returns: True if the condition is met,
888                  False otherwise
889
890        """
891
892        if start_delay > 0:
893            logging.debug('waiting for %s secs before checking %s',start_delay,
894                          method_name)
895            time.sleep(start_delay)
896
897        try:
898            utils.poll_for_condition(condition=func,
899                                     timeout=timeout,
900                                     sleep_interval=sleep_interval,
901                                     desc=('Waiting %s' % method_name))
902            return True
903        except utils.TimeoutError as e:
904            logging.error('%s: %s', method_name, e)
905        except Exception as e:
906            logging.error('%s: %s', method_name, e)
907            err = 'bluetoothd possibly crashed. Check out /var/log/messages.'
908            logging.error(err)
909        except:
910            logging.error('%s: unexpected error', method_name)
911        return False
912
913    def ignore_failure(instance, test_method, *args, **kwargs):
914        """ Wrapper to prevent a test_method failure from failing the test batch
915
916        Sometimes a test method needs to be used as a normal function, for its
917        result. This wrapper prevent test_method failure being recorded in
918        instance.fails and causing a failure of the quick test batch.
919
920        @param test_method: test_method
921        @returns: result of the test_method
922        """
923
924        original_fails = instance.fails[:]
925        test_result = test_method(*args, **kwargs)
926        if not test_result:
927            logging.info("%s failure is ignored",test_method.__name__)
928            instance.fails = original_fails
929        return test_result
930
931    # -------------------------------------------------------------------
932    # Adater standalone tests
933    # -------------------------------------------------------------------
934
935
936    def enable_disable_debug_log(self, enable):
937        """Enable or disable debug log in DUT
938        @param enable: True to enable all of the debug log,
939                       False to disable all of the debug log.
940        """
941        level = int(enable)
942        self.bluetooth_facade.set_debug_log_levels(level, level, level, level)
943
944
945    def start_new_btmon(self):
946        """ Start a new btmon process and save the log """
947
948        # Kill all btmon process before creating a new one
949        self.host.run('pkill btmon || true')
950
951        # Make sure the directory exists
952        self.host.run('mkdir -p %s' % self.BTMON_DIR_LOG_PATH)
953
954        # Time format. Ex, 2020_02_20_17_52_45
955        now = time.strftime("%Y_%m_%d_%H_%M_%S")
956        file_name = 'btsnoop_%s' % now
957        self.host.run_background('btmon -SAw %s/%s' % (self.BTMON_DIR_LOG_PATH,
958                                                       file_name))
959
960
961    def log_message(self, msg):
962        """ Write a string to log."""
963        self.bluetooth_facade.log_message(msg)
964
965
966    @_test_retry_and_log
967    def test_bluetoothd_running(self):
968        """Test that bluetoothd is running."""
969        return self.bluetooth_facade.is_bluetoothd_running()
970
971
972    @_test_retry_and_log
973    def test_start_bluetoothd(self):
974        """Test that bluetoothd could be started successfully."""
975        return self.bluetooth_facade.start_bluetoothd()
976
977
978    @_test_retry_and_log
979    def test_stop_bluetoothd(self):
980        """Test that bluetoothd could be stopped successfully."""
981        return self.bluetooth_facade.stop_bluetoothd()
982
983
984    @_test_retry_and_log
985    def test_has_adapter(self):
986        """Verify that there is an adapter. This will return True only if both
987        the kernel and bluetooth daemon see the adapter.
988        """
989        return self.bluetooth_facade.has_adapter()
990
991    @_test_retry_and_log
992    def test_adapter_work_state(self):
993        """Test that the bluetooth adapter is in the correct working state.
994
995        This includes that the adapter is detectable, is powered on,
996        and its hci device is hci0.
997        """
998        has_adapter = self.bluetooth_facade.has_adapter()
999        is_powered_on = self._wait_for_condition(
1000                self.bluetooth_facade.is_powered_on, method_name())
1001        hci = self.bluetooth_facade.get_hci() == self.EXPECTED_HCI
1002        self.results = {
1003                'has_adapter': has_adapter,
1004                'is_powered_on': is_powered_on,
1005                'hci': hci}
1006        return all(self.results.values())
1007
1008
1009    @_test_retry_and_log
1010    def test_power_on_adapter(self):
1011        """Test that the adapter could be powered on successfully."""
1012        power_on = self.bluetooth_facade.set_powered(True)
1013        is_powered_on = self._wait_for_condition(
1014                self.bluetooth_facade.is_powered_on, method_name())
1015
1016        self.results = {'power_on': power_on, 'is_powered_on': is_powered_on}
1017        return all(self.results.values())
1018
1019
1020    @_test_retry_and_log
1021    def test_power_off_adapter(self):
1022        """Test that the adapter could be powered off successfully."""
1023        power_off = self.bluetooth_facade.set_powered(False)
1024        is_powered_off = self._wait_for_condition(
1025                lambda: not self.bluetooth_facade.is_powered_on(),
1026                method_name())
1027
1028        self.results = {
1029                'power_off': power_off,
1030                'is_powered_off': is_powered_off}
1031        return all(self.results.values())
1032
1033
1034    @_test_retry_and_log
1035    def test_reset_on_adapter(self):
1036        """Test that the adapter could be reset on successfully.
1037
1038        This includes restarting bluetoothd, and removing the settings
1039        and cached devices.
1040        """
1041        reset_on = self.bluetooth_facade.reset_on()
1042        is_powered_on = self._wait_for_condition(
1043                self.bluetooth_facade.is_powered_on, method_name())
1044
1045        self.results = {'reset_on': reset_on, 'is_powered_on': is_powered_on}
1046        return all(self.results.values())
1047
1048
1049    @_test_retry_and_log
1050    def test_reset_off_adapter(self):
1051        """Test that the adapter could be reset off successfully.
1052
1053        This includes restarting bluetoothd, and removing the settings
1054        and cached devices.
1055        """
1056        reset_off = self.bluetooth_facade.reset_off()
1057        is_powered_off = self._wait_for_condition(
1058                lambda: not self.bluetooth_facade.is_powered_on(),
1059                method_name())
1060
1061        self.results = {
1062                'reset_off': reset_off,
1063                'is_powered_off': is_powered_off}
1064        return all(self.results.values())
1065
1066
1067    @_test_retry_and_log
1068    def test_UUIDs(self):
1069        """Test that basic profiles are supported."""
1070        adapter_UUIDs = self.bluetooth_facade.get_UUIDs()
1071        self.results = [uuid for uuid in self.SUPPORTED_UUIDS.values()
1072                        if uuid not in adapter_UUIDs]
1073        return not bool(self.results)
1074
1075
1076    @_test_retry_and_log
1077    def test_start_discovery(self):
1078        """Test that the adapter could start discovery."""
1079        start_discovery, _ = self.bluetooth_facade.start_discovery()
1080        is_discovering = self._wait_for_condition(
1081                self.bluetooth_facade.is_discovering, method_name())
1082
1083        self.results = {
1084                'start_discovery': start_discovery,
1085                'is_discovering': is_discovering}
1086        return all(self.results.values())
1087
1088    @_test_retry_and_log(False)
1089    def test_is_discovering(self):
1090        """Test that the adapter is already discovering."""
1091        is_discovering = self._wait_for_condition(
1092                self.bluetooth_facade.is_discovering, method_name())
1093
1094        self.results = {'is_discovering': is_discovering}
1095        return all(self.results.values())
1096
1097    @_test_retry_and_log
1098    def test_stop_discovery(self):
1099        """Test that the adapter could stop discovery."""
1100        stop_discovery, _ = self.bluetooth_facade.stop_discovery()
1101        is_not_discovering = self._wait_for_condition(
1102                lambda: not self.bluetooth_facade.is_discovering(),
1103                method_name())
1104
1105        self.results = {
1106                'stop_discovery': stop_discovery,
1107                'is_not_discovering': is_not_discovering}
1108        return all(self.results.values())
1109
1110
1111    @_test_retry_and_log
1112    def test_discoverable(self):
1113        """Test that the adapter could be set discoverable."""
1114        set_discoverable = self.bluetooth_facade.set_discoverable(True)
1115        is_discoverable = self._wait_for_condition(
1116                self.bluetooth_facade.is_discoverable, method_name())
1117
1118        self.results = {
1119                'set_discoverable': set_discoverable,
1120                'is_discoverable': is_discoverable}
1121        return all(self.results.values())
1122
1123    @_test_retry_and_log(False)
1124    def test_is_discoverable(self):
1125        """Test that the adapter is discoverable."""
1126        is_discoverable = self._wait_for_condition(
1127                self.bluetooth_facade.is_discoverable, method_name())
1128
1129        self.results = {'is_discoverable': is_discoverable}
1130        return all(self.results.values())
1131
1132
1133    def _test_timeout_property(self, set_property, check_property, set_timeout,
1134                              get_timeout, property_name,
1135                              timeout_values = [0, 60, 180]):
1136        """Common method to test (Discoverable/Pairable)Timeout property.
1137
1138        This is used to test
1139        - DiscoverableTimeout property
1140        - PairableTimeout property
1141
1142        The test performs the following
1143           - Set PropertyTimeout
1144           - Read PropertyTimeout and make sure values match
1145           - Set adapter propety
1146           - In a loop check if property is active
1147           - Test fails property is false before timeout
1148           - Test fails property is True after timeout
1149           Repeat the test for different values for timeout
1150
1151           Note : Value of 0 mean it never timeouts, so the test will
1152                 end after 30 seconds.
1153        """
1154        def check_timeout(timeout):
1155            """Check for timeout value in loop while recording failures."""
1156            actual_timeout = get_timeout()
1157            if timeout != actual_timeout:
1158                logging.debug('%s timeout value read %s does not '
1159                              'match value set %s, yet', property_name,
1160                              actual_timeout, timeout)
1161                return False
1162            else:
1163                return True
1164
1165        def _test_timeout_property(timeout):
1166            # minium time after timeout before checking property
1167            MIN_DELTA_SECS = 3
1168            # Time between checking  property
1169            WAIT_TIME_SECS = 2
1170
1171            # Set and read back the timeout value
1172            if not set_timeout(timeout):
1173                logging.error('Setting the %s timeout failed',property_name)
1174                return False
1175
1176
1177            if not self._wait_for_condition(lambda : check_timeout(timeout),
1178                                            'check_'+property_name):
1179                logging.error('checking %s_timeout value timed out',
1180                              property_name)
1181                return False
1182
1183            #
1184            # Check that the timeout works
1185            # Check property is true until timeout
1186            # and then it is not
1187
1188            property_set = set_property(True)
1189            property_is_true = self._wait_for_condition(check_property,
1190                                                        method_name())
1191
1192            self.results = { 'set_%s' % property_name : property_set,
1193                             'is_%s' % property_name: property_is_true}
1194            logging.debug(self.results)
1195
1196            if not all(self.results.values()):
1197                logging.error('Setting %s failed',property_name)
1198                return False
1199
1200            start_time = time.time()
1201            while True:
1202                time.sleep(WAIT_TIME_SECS)
1203                cur_time = time.time()
1204                property_set = check_property()
1205                time_elapsed = cur_time - start_time
1206
1207                # Ignore check_property results made near the timeout
1208                # to avoid spurious failures.
1209                if abs(int(timeout - time_elapsed)) < MIN_DELTA_SECS:
1210                    continue
1211
1212                # Timeout of zero seconds mean that the adapter never times out
1213                # Check for 30 seconds and then exit the test.
1214                if timeout == 0:
1215                    if not property_set:
1216                        logging.error('Adapter is not %s after %.2f '
1217                                      'secs with a timeout of zero ',
1218                                      property_name, time_elapsed)
1219                        return False
1220                    elif time_elapsed > 30:
1221                        logging.debug('Adapter %s after %.2f seconds '
1222                                      'with timeout of zero as expected' ,
1223                                      property_name, time_elapsed)
1224                        return True
1225                    continue
1226
1227                #
1228                # Check if property is true till timeout ends and
1229                # false afterwards
1230                #
1231                if time_elapsed < timeout:
1232                    if not property_set:
1233                        logging.error('Adapter is not %s after %.2f '
1234                                      'secs before timeout of %.2f',
1235                                      property_name, time_elapsed, timeout)
1236                        return False
1237                else:
1238                    if property_set:
1239                        logging.error('Adapter is still %s after '
1240                                      ' %.2f secs with timeout of %.2f',
1241                                      property_name, time_elapsed, timeout)
1242                        return False
1243                    else:
1244                        logging.debug('Adapter not %s after %.2f '
1245                                      'secs with timeout of %.2f as expected ',
1246                                      property_name, time_elapsed, timeout)
1247                        return True
1248
1249        default_value = check_property()
1250        default_timeout = get_timeout()
1251
1252        result = []
1253        try:
1254            for timeout in timeout_values:
1255                result.append(_test_timeout_property(timeout))
1256            logging.debug("Test returning %s", all(result))
1257            return all(result)
1258        except:
1259            logging.error("exception in test_%s_timeout",property_name)
1260            raise
1261        finally:
1262            # Set the property back to default value permanently before
1263            # exiting the test
1264            set_timeout(0)
1265            set_property(default_value)
1266            # Set the timeout back to default value before exiting the test
1267            set_timeout(default_timeout)
1268
1269
1270    @_test_retry_and_log
1271    def test_discoverable_timeout(self, timeout_values = [0, 60, 180]):
1272        """Test adapter dbus property DiscoverableTimeout."""
1273        return self._test_timeout_property(
1274            set_property = self.bluetooth_facade.set_discoverable,
1275            check_property = self.bluetooth_facade.is_discoverable,
1276            set_timeout = self.bluetooth_facade.set_discoverable_timeout,
1277            get_timeout = self.bluetooth_facade.get_discoverable_timeout,
1278            property_name = 'discoverable',
1279            timeout_values = timeout_values)
1280
1281    @_test_retry_and_log
1282    def test_pairable_timeout(self, timeout_values = [0, 60, 180]):
1283        """Test adapter dbus property PairableTimeout."""
1284        return self._test_timeout_property(
1285            set_property = self.bluetooth_facade.set_pairable,
1286            check_property = self.bluetooth_facade.is_pairable,
1287            set_timeout = self.bluetooth_facade.set_pairable_timeout,
1288            get_timeout = self.bluetooth_facade.get_pairable_timeout,
1289            property_name = 'pairable',
1290            timeout_values = timeout_values)
1291
1292
1293    @_test_retry_and_log
1294    def test_nondiscoverable(self):
1295        """Test that the adapter could be set non-discoverable."""
1296        set_nondiscoverable = self.bluetooth_facade.set_discoverable(False)
1297        is_nondiscoverable = self._wait_for_condition(
1298                lambda: not self.bluetooth_facade.is_discoverable(),
1299                method_name())
1300
1301        self.results = {
1302                'set_nondiscoverable': set_nondiscoverable,
1303                'is_nondiscoverable': is_nondiscoverable}
1304        return all(self.results.values())
1305
1306
1307    @_test_retry_and_log
1308    def test_pairable(self):
1309        """Test that the adapter could be set pairable."""
1310        set_pairable = self.bluetooth_facade.set_pairable(True)
1311        is_pairable = self._wait_for_condition(
1312                self.bluetooth_facade.is_pairable, method_name())
1313
1314        self.results = {
1315                'set_pairable': set_pairable,
1316                'is_pairable': is_pairable}
1317        return all(self.results.values())
1318
1319
1320    @_test_retry_and_log
1321    def test_nonpairable(self):
1322        """Test that the adapter could be set non-pairable."""
1323        set_nonpairable = self.bluetooth_facade.set_pairable(False)
1324        is_nonpairable = self._wait_for_condition(
1325                lambda: not self.bluetooth_facade.is_pairable(), method_name())
1326
1327        self.results = {
1328                'set_nonpairable': set_nonpairable,
1329                'is_nonpairable': is_nonpairable}
1330        return all(self.results.values())
1331
1332
1333    @_test_retry_and_log(False)
1334    def test_check_valid_adapter_id(self):
1335        """Fail if the Bluetooth ID is not in the correct format.
1336
1337        @returns True if adapter ID follows expected format, False otherwise
1338        """
1339
1340        # Boards which only support bluetooth version 3 and below
1341        BLUETOOTH_3_BOARDS = ['x86-mario', 'x86-zgb']
1342
1343        device = self.host.get_platform()
1344        adapter_info = self.get_adapter_properties()
1345
1346        # Don't complete test if this is a reference board
1347        if device in self.REFERENCE_BOARDS:
1348            return True
1349
1350        modalias = adapter_info['Modalias']
1351        logging.debug('Saw Bluetooth ID of: %s', modalias)
1352
1353        if device in BLUETOOTH_3_BOARDS:
1354            bt_format = 'bluetooth:v00E0p24..d0300'
1355        else:
1356            bt_format = 'bluetooth:v00E0p24..d0400'
1357
1358        if not re.match(bt_format, modalias):
1359            return False
1360
1361        return True
1362
1363
1364    @_test_retry_and_log(False)
1365    def test_check_valid_alias(self):
1366        """Fail if the Bluetooth alias is not in the correct format.
1367
1368        @returns True if adapter alias follows expected format, False otherwise
1369        """
1370
1371        device = self.host.get_platform()
1372        adapter_info = self.get_adapter_properties()
1373
1374        # Don't complete test if this is a reference board
1375        if device in self.REFERENCE_BOARDS:
1376            return True
1377
1378        alias = adapter_info['Alias']
1379        logging.debug('Saw Bluetooth Alias of: %s', alias)
1380
1381        device_type = self.host.get_board_type().lower()
1382        alias_format = '%s_[a-z0-9]{4}' % device_type
1383        if not re.match(alias_format, alias.lower()):
1384            return False
1385
1386        return True
1387
1388
1389    # -------------------------------------------------------------------
1390    # Tests about general discovering, pairing, and connection
1391    # -------------------------------------------------------------------
1392
1393
1394    @_test_retry_and_log(False)
1395    def test_discover_device(self, device_address):
1396        """Test that the adapter could discover the specified device address.
1397
1398        @param device_address: Address of the device.
1399
1400        @returns: True if the device is found. False otherwise.
1401
1402        """
1403        has_device_initially = False
1404        start_discovery = False
1405        device_discovered = False
1406        has_device = self.bluetooth_facade.has_device
1407
1408        if has_device(device_address):
1409            has_device_initially = True
1410        else:
1411            start_discovery, _ = self.bluetooth_facade.start_discovery()
1412            if start_discovery:
1413                try:
1414                    utils.poll_for_condition(
1415                            condition=(lambda: has_device(device_address)),
1416                            timeout=self.ADAPTER_DISCOVER_TIMEOUT_SECS,
1417                            sleep_interval=
1418                            self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS,
1419                            desc='Waiting for discovering %s' % device_address)
1420                    device_discovered = True
1421                except utils.TimeoutError as e:
1422                    logging.error('test_discover_device: %s', e)
1423                except Exception as e:
1424                    logging.error('test_discover_device: %s', e)
1425                    err = ('bluetoothd probably crashed.'
1426                           'Check out /var/log/messages')
1427                    logging.error(err)
1428                except:
1429                    logging.error('test_discover_device: unexpected error')
1430
1431        self.results = {
1432                'has_device_initially': has_device_initially,
1433                'start_discovery': start_discovery,
1434                'device_discovered': device_discovered}
1435        return has_device_initially or device_discovered
1436
1437    def _test_discover_by_device(self, device):
1438        device_discovered = device.Discover(self.bluetooth_facade.address)
1439
1440        self.results = {
1441                'device_discovered': device_discovered
1442        }
1443
1444        return all(self.results.values())
1445
1446    @_test_retry_and_log(False)
1447    def test_discover_by_device(self, device):
1448        """Test that the device could discover the adapter address.
1449
1450        @param device: Meta device to represent peer device.
1451
1452        @returns: True if the adapter is found by the device.
1453        """
1454        return self._test_discover_by_device(device)
1455
1456    @_test_retry_and_log(False)
1457    def test_discover_by_device_fails(self, device):
1458        """Test that the device could not discover the adapter address.
1459
1460        @param device: Meta device to represent peer device.
1461
1462        @returns False if the adapter is found by the device.
1463        """
1464        return not self._test_discover_by_device(device)
1465
1466    @_test_retry_and_log(False)
1467    def test_device_set_discoverable(self, device, discoverable):
1468        """Test that we could set the peer device to discoverable. """
1469        try:
1470            device.SetDiscoverable(discoverable)
1471        except:
1472            return False
1473
1474        return True
1475
1476    @_test_retry_and_log
1477    def test_pairing(self, device_address, pin, trusted=True):
1478        """Test that the adapter could pair with the device successfully.
1479
1480        @param device_address: Address of the device.
1481        @param pin: pin code to pair with the device.
1482        @param trusted: indicating whether to set the device trusted.
1483
1484        @returns: True if pairing succeeds. False otherwise.
1485
1486        """
1487
1488        def _pair_device():
1489            """Pair to the device.
1490
1491            @returns: True if it could pair with the device. False otherwise.
1492
1493            """
1494            return self.bluetooth_facade.pair_legacy_device(
1495                    device_address, pin, trusted,
1496                    self.ADAPTER_PAIRING_TIMEOUT_SECS)
1497
1498
1499        def _verify_connection_info():
1500            """Verify that connection info to device is retrievable.
1501
1502            @returns: True if the connection info is retrievable.
1503                      False otherwise.
1504            """
1505            return (self.bluetooth_facade.get_connection_info(device_address)
1506                    is not None)
1507
1508
1509        has_device = False
1510        paired = False
1511        connection_info_retrievable = False
1512        if self.bluetooth_facade.has_device(device_address):
1513            has_device = True
1514            try:
1515                utils.poll_for_condition(
1516                        condition=_pair_device,
1517                        timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
1518                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
1519                        desc='Waiting for pairing %s' % device_address)
1520                paired = True
1521            except utils.TimeoutError as e:
1522                logging.error('test_pairing: %s', e)
1523            except:
1524                logging.error('test_pairing: unexpected error')
1525
1526            connection_info_retrievable = _verify_connection_info()
1527
1528        self.results = {
1529                'has_device': has_device,
1530                'paired': paired,
1531                'connection_info_retrievable': connection_info_retrievable}
1532        return all(self.results.values())
1533
1534
1535    @_test_retry_and_log
1536    def test_remove_pairing(self, device_address):
1537        """Test that the adapter could remove the paired device.
1538
1539        @param device_address: Address of the device.
1540
1541        @returns: True if the device is removed successfully. False otherwise.
1542
1543        """
1544        device_is_paired_initially = self.bluetooth_facade.device_is_paired(
1545                device_address)
1546        remove_pairing = False
1547        pairing_removed = False
1548
1549        if device_is_paired_initially:
1550            remove_pairing = self.bluetooth_facade.remove_device_object(
1551                    device_address)
1552            pairing_removed = not self.bluetooth_facade.device_is_paired(
1553                    device_address)
1554
1555        self.results = {
1556                'device_is_paired_initially': device_is_paired_initially,
1557                'remove_pairing': remove_pairing,
1558                'pairing_removed': pairing_removed}
1559        return all(self.results.values())
1560
1561
1562    def test_set_trusted(self, device_address, trusted=True):
1563        """Test whether the device with the specified address is trusted.
1564
1565        @param device_address: Address of the device.
1566        @param trusted : True or False indicating if trusted is expected.
1567
1568        @returns: True if the device's "Trusted" property is as specified;
1569                  False otherwise.
1570
1571        """
1572
1573        set_trusted = self.bluetooth_facade.set_trusted(
1574                device_address, trusted)
1575
1576        actual_trusted = self.bluetooth_facade.get_device_property(
1577                                device_address, 'Trusted')
1578
1579        self.results = {
1580                'set_trusted': set_trusted,
1581                'actual trusted': actual_trusted,
1582                'expected trusted': trusted}
1583        return actual_trusted == trusted
1584
1585
1586    @_test_retry_and_log
1587    def test_connection_by_adapter(self, device_address):
1588        """Test that the adapter of dut could connect to the device successfully
1589
1590        It is the caller's responsibility to pair to the device before
1591        doing connection.
1592
1593        @param device_address: Address of the device.
1594
1595        @returns: True if connection is performed. False otherwise.
1596
1597        """
1598
1599        def _connect_device():
1600            """Connect to the device.
1601
1602            @returns: True if it could connect to the device. False otherwise.
1603
1604            """
1605            return self.bluetooth_facade.connect_device(device_address)
1606
1607
1608        has_device = False
1609        connected = False
1610        if self.bluetooth_facade.has_device(device_address):
1611            has_device = True
1612            try:
1613                utils.poll_for_condition(
1614                        condition=_connect_device,
1615                        timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
1616                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
1617                        desc='Waiting for connecting to %s' % device_address)
1618                connected = True
1619            except utils.TimeoutError as e:
1620                logging.error('test_connection_by_adapter: %s', e)
1621            except:
1622                logging.error('test_connection_by_adapter: unexpected error')
1623
1624        self.results = {'has_device': has_device, 'connected': connected}
1625        return all(self.results.values())
1626
1627
1628    @_test_retry_and_log
1629    def test_disconnection_by_adapter(self, device_address):
1630        """Test that the adapter of dut could disconnect the device successfully
1631
1632        @param device_address: Address of the device.
1633
1634        @returns: True if disconnection is performed. False otherwise.
1635
1636        """
1637        return self.bluetooth_facade.disconnect_device(device_address)
1638
1639
1640    def _enter_command_mode(self, device):
1641        """Let the device enter command mode.
1642
1643        Before using the device, need to call this method to make sure
1644        it is in the command mode.
1645
1646        @param device: the bluetooth HID device
1647
1648        @returns: True if successful. False otherwise.
1649
1650        """
1651        result = _is_successful(_run_method(device.EnterCommandMode,
1652                                            'EnterCommandMode'))
1653        if not result:
1654            logging.error('EnterCommandMode failed')
1655        return result
1656
1657
1658    @_test_retry_and_log
1659    def test_connection_by_device(self, device):
1660        """Test that the device could connect to the adapter successfully.
1661
1662        This emulates the behavior that a device may initiate a
1663        connection request after waking up from power saving mode.
1664
1665        @param device: the bluetooth HID device
1666
1667        @returns: True if connection is performed correctly by device and
1668                  the adapter also enters connection state.
1669                  False otherwise.
1670
1671        """
1672        if not self._enter_command_mode(device):
1673            return False
1674
1675        method_name = 'test_connection_by_device'
1676        connection_by_device = False
1677        adapter_address = self.bluetooth_facade.address
1678        try:
1679            device.ConnectToRemoteAddress(adapter_address)
1680            connection_by_device = True
1681        except Exception as e:
1682            logging.error('%s (device): %s', method_name, e)
1683        except:
1684            logging.error('%s (device): unexpected error', method_name)
1685
1686        connection_seen_by_adapter = False
1687        device_address = device.address
1688        device_is_connected = self.bluetooth_facade.device_is_connected
1689        try:
1690            utils.poll_for_condition(
1691                    condition=lambda: device_is_connected(device_address),
1692                    timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS,
1693                    desc=('Waiting for connection from %s' % device_address))
1694            connection_seen_by_adapter = True
1695        except utils.TimeoutError as e:
1696            logging.error('%s (adapter): %s', method_name, e)
1697        except:
1698            logging.error('%s (adapter): unexpected error', method_name)
1699
1700        self.results = {
1701                'connection_by_device': connection_by_device,
1702                'connection_seen_by_adapter': connection_seen_by_adapter}
1703        return all(self.results.values())
1704
1705    @_test_retry_and_log
1706    def test_connection_by_device_only(self, device, adapter_address):
1707      """Test that the device could connect to adapter successfully.
1708
1709      This is a modified version of test_connection_by_device that only
1710      communicates with the peer device and not the host (in case the host is
1711      suspended for example).
1712
1713      @param device: the bluetooth peer device
1714      @param adapter_address: address of the adapter
1715
1716      @returns: True if the connection was established by the device or False.
1717      """
1718      connected = device.ConnectToRemoteAddress(adapter_address)
1719      self.results = {
1720          'connection_by_device': connected
1721      }
1722
1723      return all(self.results.values())
1724
1725
1726    @_test_retry_and_log
1727    def test_disconnection_by_device(self, device):
1728        """Test that the device could disconnect the adapter successfully.
1729
1730        This emulates the behavior that a device may initiate a
1731        disconnection request before going into power saving mode.
1732
1733        Note: should not try to enter command mode in this method. When
1734              a device is connected, there is no way to enter command mode.
1735              One could just issue a special disconnect command without
1736              entering command mode.
1737
1738        @param device: the bluetooth HID device
1739
1740        @returns: True if disconnection is performed correctly by device and
1741                  the adapter also observes the disconnection.
1742                  False otherwise.
1743
1744        """
1745        method_name = 'test_disconnection_by_device'
1746        disconnection_by_device = False
1747        try:
1748            device.Disconnect()
1749            disconnection_by_device = True
1750        except Exception as e:
1751            logging.error('%s (device): %s', method_name, e)
1752        except:
1753            logging.error('%s (device): unexpected error', method_name)
1754
1755        disconnection_seen_by_adapter = False
1756        device_address = device.address
1757        device_is_connected = self.bluetooth_facade.device_is_connected
1758        try:
1759            utils.poll_for_condition(
1760                    condition=lambda: not device_is_connected(device_address),
1761                    timeout=self.ADAPTER_DISCONNECTION_TIMEOUT_SECS,
1762                    desc=('Waiting for disconnection from %s' % device_address))
1763            disconnection_seen_by_adapter = True
1764        except utils.TimeoutError as e:
1765            logging.error('%s (adapter): %s', method_name, e)
1766        except:
1767            logging.error('%s (adapter): unexpected error', method_name)
1768
1769        self.results = {
1770                'disconnection_by_device': disconnection_by_device,
1771                'disconnection_seen_by_adapter': disconnection_seen_by_adapter}
1772        return all(self.results.values())
1773
1774
1775    @_test_retry_and_log(False)
1776    def test_device_is_connected(self, device_address):
1777        """Test that device address given is currently connected.
1778
1779        @param device_address: Address of the device.
1780
1781        @returns: True if the device is connected.
1782                  False otherwise.
1783        """
1784
1785        def _is_connected():
1786            """Test if device is connected.
1787
1788            @returns: True if device is connected. False otherwise.
1789
1790            """
1791            return self.bluetooth_facade.device_is_connected(device_address)
1792
1793
1794        method_name = 'test_device_is_connected'
1795        has_device = False
1796        connected = False
1797        if self.bluetooth_facade.has_device(device_address):
1798            has_device = True
1799            try:
1800                utils.poll_for_condition(
1801                        condition=_is_connected,
1802                        timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS,
1803                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
1804                        desc='Waiting to check connection to %s' %
1805                              device_address)
1806                connected = True
1807            except utils.TimeoutError as e:
1808                logging.error('%s: %s', method_name, e)
1809            except:
1810                logging.error('%s: unexpected error', method_name)
1811        self.results = {'has_device': has_device, 'connected': connected}
1812        return all(self.results.values())
1813
1814
1815    @_test_retry_and_log(False)
1816    def test_device_is_not_connected(self, device_address):
1817        """Test that device address given is NOT currently connected.
1818
1819        @param device_address: Address of the device.
1820
1821        @returns: True if the device is NOT connected.
1822                  False otherwise.
1823
1824        """
1825
1826        def _is_not_connected():
1827            """Test if device is not connected.
1828
1829            @returns: True if device is not connected. False otherwise.
1830
1831            """
1832            return not self.bluetooth_facade.device_is_connected(
1833                    device_address)
1834
1835
1836        method_name = 'test_device_is_not_connected'
1837        not_connected = False
1838        if self.bluetooth_facade.has_device(device_address):
1839            try:
1840                utils.poll_for_condition(
1841                        condition=_is_not_connected,
1842                        timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS,
1843                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
1844                        desc='Waiting to check connection to %s' %
1845                              device_address)
1846                not_connected = True
1847            except utils.TimeoutError as e:
1848                logging.error('%s: %s', method_name, e)
1849            except:
1850                logging.error('%s: unexpected error', method_name)
1851                raise
1852        else:
1853            not_connected = True
1854        self.results = {'not_connected': not_connected}
1855        return all(self.results.values())
1856
1857
1858    @_test_retry_and_log
1859    def test_device_is_paired(self, device_address):
1860        """Test that the device address given is currently paired.
1861
1862        @param device_address: Address of the device.
1863
1864        @returns: True if the device is paired.
1865                  False otherwise.
1866
1867        """
1868        def _is_paired():
1869            """Test if device is paired.
1870
1871            @returns: True if device is paired. False otherwise.
1872
1873            """
1874            return self.bluetooth_facade.device_is_paired(device_address)
1875
1876
1877        method_name = 'test_device_is_paired'
1878        has_device = False
1879        paired = False
1880        if self.bluetooth_facade.has_device(device_address):
1881            has_device = True
1882            try:
1883                utils.poll_for_condition(
1884                        condition=_is_paired,
1885                        timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
1886                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
1887                        desc='Waiting for connection to %s' % device_address)
1888                paired = True
1889            except utils.TimeoutError as e:
1890                logging.error('%s: %s', method_name, e)
1891            except:
1892                logging.error('%s: unexpected error', method_name)
1893        self.results = {'has_device': has_device, 'paired': paired}
1894        return all(self.results.values())
1895
1896
1897    def _get_device_name(self, device_address):
1898        """Get the device name.
1899
1900        @returns: True if the device name is derived. None otherwise.
1901
1902        """
1903
1904        self.discovered_device_name = self.bluetooth_facade.get_device_property(
1905                                device_address, 'Name')
1906
1907        return bool(self.discovered_device_name)
1908
1909
1910    @_test_retry_and_log
1911    def test_device_name(self, device_address, expected_device_name):
1912        """Test that the device name discovered by the adapter is correct.
1913
1914        @param device_address: Address of the device.
1915        @param expected_device_name: the bluetooth device name
1916
1917        @returns: True if the discovered_device_name is expected_device_name.
1918                  False otherwise.
1919
1920        """
1921        try:
1922            utils.poll_for_condition(
1923                    condition=lambda: self._get_device_name(device_address),
1924                    timeout=self.ADAPTER_DISCOVER_NAME_TIMEOUT_SECS,
1925                    sleep_interval=self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS,
1926                    desc='Waiting for device name of %s' % device_address)
1927        except utils.TimeoutError as e:
1928            logging.error('test_device_name: %s', e)
1929        except:
1930            logging.error('test_device_name: unexpected error')
1931
1932        self.results = {
1933                'expected_device_name': expected_device_name,
1934                'discovered_device_name': self.discovered_device_name}
1935        return self.discovered_device_name == expected_device_name
1936
1937
1938    @_test_retry_and_log
1939    def test_device_class_of_service(self, device_address,
1940                                     expected_class_of_service):
1941        """Test that the discovered device class of service is as expected.
1942
1943        @param device_address: Address of the device.
1944        @param expected_class_of_service: the expected class of service
1945
1946        @returns: True if the discovered class of service matches the
1947                  expected class of service. False otherwise.
1948
1949        """
1950
1951        device_class = self.bluetooth_facade.get_device_property(device_address,
1952                                                                 'Class')
1953        discovered_class_of_service = (device_class & self.CLASS_OF_SERVICE_MASK
1954                                       if device_class else None)
1955
1956        self.results = {
1957                'device_class': device_class,
1958                'expected_class_of_service': expected_class_of_service,
1959                'discovered_class_of_service': discovered_class_of_service}
1960        return discovered_class_of_service == expected_class_of_service
1961
1962
1963    @_test_retry_and_log
1964    def test_device_class_of_device(self, device_address,
1965                                    expected_class_of_device):
1966        """Test that the discovered device class of device is as expected.
1967
1968        @param device_address: Address of the device.
1969        @param expected_class_of_device: the expected class of device
1970
1971        @returns: True if the discovered class of device matches the
1972                  expected class of device. False otherwise.
1973
1974        """
1975
1976        device_class = self.bluetooth_facade.get_device_property(device_address,
1977                                                                 'Class')
1978        discovered_class_of_device = (device_class & self.CLASS_OF_DEVICE_MASK
1979                                      if device_class else None)
1980
1981        self.results = {
1982                'device_class': device_class,
1983                'expected_class_of_device': expected_class_of_device,
1984                'discovered_class_of_device': discovered_class_of_device}
1985        return discovered_class_of_device == expected_class_of_device
1986
1987
1988    def _get_btmon_log(self, method, logging_timespan=1):
1989        """Capture the btmon log when executing the specified method.
1990
1991        @param method: the method to capture log.
1992                       The method would be executed only when it is not None.
1993                       This allows us to simply capture btmon log without
1994                       executing any command.
1995        @param logging_timespan: capture btmon log for logging_timespan seconds.
1996
1997        """
1998        self.bluetooth_le_facade.btmon_start()
1999        self.advertising_msg = method() if method else ''
2000        time.sleep(logging_timespan)
2001        self.bluetooth_le_facade.btmon_stop()
2002
2003
2004    def convert_to_adv_jiffies(self, adv_interval_ms):
2005        """Convert adv interval in ms to jiffies, i.e., multiples of 0.625 ms.
2006
2007        @param adv_interval_ms: an advertising interval
2008
2009        @returns: the equivalent jiffies
2010
2011        """
2012        return adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT
2013
2014
2015    def compute_duration(self, max_adv_interval_ms):
2016        """Compute duration from max_adv_interval_ms.
2017
2018        Advertising duration is calculated approximately as
2019            duration = max_adv_interval_ms / 1000.0 * 1.1
2020
2021        @param max_adv_interval_ms: max advertising interval in milliseconds.
2022
2023        @returns: duration in seconds.
2024
2025        """
2026        return max_adv_interval_ms / 1000.0 * 1.1
2027
2028
2029    def compute_logging_timespan(self, max_adv_interval_ms):
2030        """Compute the logging timespan from max_adv_interval_ms.
2031
2032        The logging timespan is the time needed to record btmon log.
2033
2034        @param max_adv_interval_ms: max advertising interval in milliseconds.
2035
2036        @returns: logging_timespan in seconds.
2037
2038        """
2039        duration = self.compute_duration(max_adv_interval_ms)
2040        logging_timespan = max(self.count_advertisements * duration, 1)
2041        return logging_timespan
2042
2043
2044    @_test_retry_and_log(False)
2045    def test_check_duration_and_intervals(self, min_adv_interval_ms,
2046                                          max_adv_interval_ms,
2047                                          number_advertisements):
2048        """Verify that every advertisements are scheduled according to the
2049        duration and intervals.
2050
2051        An advertisement would be scheduled at the time span of
2052             duration * number_advertisements
2053
2054        @param min_adv_interval_ms: min advertising interval in milliseconds.
2055        @param max_adv_interval_ms: max advertising interval in milliseconds.
2056        @param number_advertisements: the number of existing advertisements
2057
2058        @returns: True if all advertisements are scheduled based on the
2059                duration and intervals.
2060
2061        """
2062
2063
2064        def within_tolerance(expected, actual, max_error=0.1):
2065            """Determine if the percent error is within specified tolerance.
2066
2067            @param expected: The expected value.
2068            @param actual: The actual (measured) value.
2069            @param max_error: The maximum percent error acceptable.
2070
2071            @returns: True if the percent error is less than or equal to
2072                      max_error.
2073            """
2074            return abs(expected - actual) / abs(expected) <= max_error
2075
2076
2077        start_str = 'Set Advertising Intervals:'
2078        search_strings = ['HCI Command: LE Set Advertising Data', 'Company']
2079        search_str = '|'.join(search_strings)
2080
2081        contents = self.bluetooth_le_facade.btmon_get(search_str=search_str,
2082                                                      start_str=start_str)
2083
2084        # Company string looks like
2085        #   Company: not assigned (65283)
2086        company_pattern = re.compile('Company:.*\((\d*)\)')
2087
2088        # The string with timestamp looks like
2089        #   < HCI Command: LE Set Advertising Data (0x08|0x0008) [hci0] 3.799236
2090        set_adv_time_str = 'LE Set Advertising Data.*\[hci\d\].*(\d+\.\d+)'
2091        set_adv_time_pattern = re.compile(set_adv_time_str)
2092
2093        adv_timestamps = {}
2094        timestamp = None
2095        manufacturer_id = None
2096        for line in contents:
2097            result = set_adv_time_pattern.search(line)
2098            if result:
2099                timestamp = float(result.group(1))
2100
2101            result = company_pattern.search(line)
2102            if result:
2103                manufacturer_id = '0x%04x' % int(result.group(1))
2104
2105            if timestamp and manufacturer_id:
2106                if manufacturer_id not in adv_timestamps:
2107                    adv_timestamps[manufacturer_id] = []
2108                adv_timestamps[manufacturer_id].append(timestamp)
2109                timestamp = None
2110                manufacturer_id = None
2111
2112        duration = self.compute_duration(max_adv_interval_ms)
2113        expected_timespan = duration * number_advertisements
2114
2115        check_duration = True
2116        for manufacturer_id, values in adv_timestamps.iteritems():
2117            logging.debug('manufacturer_id %s: %s', manufacturer_id, values)
2118            timespans = [values[i] - values[i - 1]
2119                         for i in xrange(1, len(values))]
2120            errors = [timespans[i] for i in xrange(len(timespans))
2121                      if not within_tolerance(expected_timespan, timespans[i])]
2122            logging.debug('timespans: %s', timespans)
2123            logging.debug('errors: %s', errors)
2124            if bool(errors):
2125                check_duration = False
2126
2127        # Verify that the advertising intervals are also correct.
2128        min_adv_interval_ms_found, max_adv_interval_ms_found = (
2129                self._verify_advertising_intervals(min_adv_interval_ms,
2130                                                   max_adv_interval_ms))
2131
2132        self.results = {
2133                'check_duration': check_duration,
2134                'max_adv_interval_ms_found': max_adv_interval_ms_found,
2135                'max_adv_interval_ms_found': max_adv_interval_ms_found,
2136        }
2137        return all(self.results.values())
2138
2139
2140    def _get_min_max_intervals_strings(self, min_adv_interval_ms,
2141                                       max_adv_interval_ms):
2142        """Get the min and max advertising intervals strings shown in btmon.
2143
2144        Advertising intervals shown in the btmon log look like
2145            Min advertising interval: 1280.000 msec (0x0800)
2146            Max advertising interval: 1280.000 msec (0x0800)
2147
2148        @param min_adv_interval_ms: min advertising interval in milliseconds.
2149        @param max_adv_interval_ms: max advertising interval in milliseconds.
2150
2151        @returns: the min and max intervals strings.
2152
2153        """
2154        min_str = ('Min advertising interval: %.3f msec (0x%04x)' %
2155                   (min_adv_interval_ms,
2156                    min_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT))
2157        logging.debug('min_adv_interval_ms: %s', min_str)
2158
2159        max_str = ('Max advertising interval: %.3f msec (0x%04x)' %
2160                   (max_adv_interval_ms,
2161                    max_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT))
2162        logging.debug('max_adv_interval_ms: %s', max_str)
2163
2164        return (min_str, max_str)
2165
2166
2167    def _verify_advertising_intervals(self, min_adv_interval_ms,
2168                                      max_adv_interval_ms):
2169        """Verify min and max advertising intervals.
2170
2171        Advertising intervals look like
2172            Min advertising interval: 1280.000 msec (0x0800)
2173            Max advertising interval: 1280.000 msec (0x0800)
2174
2175        @param min_adv_interval_ms: min advertising interval in milliseconds.
2176        @param max_adv_interval_ms: max advertising interval in milliseconds.
2177
2178        @returns: a tuple of (True, True) if both min and max advertising
2179            intervals could be found. Otherwise, the corresponding element
2180            in the tuple if False.
2181
2182        """
2183        min_str, max_str = self._get_min_max_intervals_strings(
2184                min_adv_interval_ms, max_adv_interval_ms)
2185
2186        min_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(min_str)
2187        max_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(max_str)
2188
2189        return min_adv_interval_ms_found, max_adv_interval_ms_found
2190
2191
2192    @_test_retry_and_log(False)
2193    def test_register_advertisement(self, advertisement_data, instance_id,
2194                                    min_adv_interval_ms, max_adv_interval_ms):
2195        """Verify that an advertisement is registered correctly.
2196
2197        This test verifies the following data:
2198        - advertisement added
2199        - manufacturer data
2200        - service UUIDs
2201        - service data
2202        - advertising intervals
2203        - advertising enabled
2204
2205        @param advertisement_data: the data of an advertisement to register.
2206        @param instance_id: the instance id which starts at 1.
2207        @param min_adv_interval_ms: min_adv_interval in milliseconds.
2208        @param max_adv_interval_ms: max_adv_interval in milliseconds.
2209
2210        @returns: True if the advertisement is registered correctly.
2211                  False otherwise.
2212
2213        """
2214        # When registering a new advertisement, it is possible that another
2215        # instance is advertising. It may need to wait for all other
2216        # advertisements to complete advertising once.
2217        self.count_advertisements += 1
2218        logging_timespan = self.compute_logging_timespan(max_adv_interval_ms)
2219        self._get_btmon_log(
2220                lambda: self.bluetooth_le_facade.register_advertisement(
2221                        advertisement_data),
2222                logging_timespan=logging_timespan)
2223
2224        # Verify that a new advertisement is added.
2225        advertisement_added = (
2226                self.bluetooth_le_facade.btmon_find('Advertising Added') and
2227                self.bluetooth_le_facade.btmon_find('Instance: %d' %
2228                                                    instance_id))
2229
2230        # Verify that the manufacturer data could be found.
2231        manufacturer_data = advertisement_data.get('ManufacturerData', '')
2232        for manufacturer_id in manufacturer_data:
2233            # The 'not assigned' text below means the manufacturer id
2234            # is not actually assigned to any real manufacturer.
2235            # For real 16-bit manufacturer UUIDs, refer to
2236            #  https://www.bluetooth.com/specifications/assigned-numbers/16-bit-UUIDs-for-Members
2237            manufacturer_data_found = self.bluetooth_le_facade.btmon_find(
2238                    'Company: not assigned (%d)' % int(manufacturer_id, 16))
2239
2240        # Verify that all service UUIDs could be found.
2241        service_uuids_found = True
2242        for uuid in advertisement_data.get('ServiceUUIDs', []):
2243            # Service UUIDs looks like ['0x180D', '0x180F']
2244            #   Heart Rate (0x180D)
2245            #   Battery Service (0x180F)
2246            # For actual 16-bit service UUIDs, refer to
2247            #   https://www.bluetooth.com/specifications/gatt/services
2248            if not self.bluetooth_le_facade.btmon_find('0x%s' % uuid):
2249                service_uuids_found = False
2250                break
2251
2252        # Verify service data.
2253        service_data_found = True
2254        for uuid, data in advertisement_data.get('ServiceData', {}).items():
2255            # A service data looks like
2256            #   Service Data (UUID 0x9999): 0001020304
2257            # while uuid is '9999' and data is [0x00, 0x01, 0x02, 0x03, 0x04]
2258            data_str = ''.join(map(lambda n: '%02x' % n, data))
2259            if not self.bluetooth_le_facade.btmon_find(
2260                    'Service Data (UUID 0x%s): %s' % (uuid, data_str)):
2261                service_data_found = False
2262                break
2263
2264        # Verify that the advertising intervals are correct.
2265        min_adv_interval_ms_found, max_adv_interval_ms_found = (
2266                self._verify_advertising_intervals(min_adv_interval_ms,
2267                                                   max_adv_interval_ms))
2268
2269        # Verify advertising is enabled.
2270        advertising_enabled = self.bluetooth_le_facade.btmon_find(
2271                'Advertising: Enabled (0x01)')
2272
2273        self.results = {
2274                'advertisement_added': advertisement_added,
2275                'manufacturer_data_found': manufacturer_data_found,
2276                'service_uuids_found': service_uuids_found,
2277                'service_data_found': service_data_found,
2278                'min_adv_interval_ms_found': min_adv_interval_ms_found,
2279                'max_adv_interval_ms_found': max_adv_interval_ms_found,
2280                'advertising_enabled': advertising_enabled,
2281        }
2282        return all(self.results.values())
2283
2284
2285    @_test_retry_and_log(False)
2286    def test_fail_to_register_advertisement(self, advertisement_data,
2287                                            min_adv_interval_ms,
2288                                            max_adv_interval_ms):
2289        """Verify that failure is incurred when max advertisements are reached.
2290
2291        This test verifies that a registration failure is incurred when
2292        max advertisements are reached. The error message looks like:
2293
2294            org.bluez.Error.Failed: Maximum advertisements reached
2295
2296        @param advertisement_data: the advertisement to register.
2297        @param min_adv_interval_ms: min_adv_interval in milliseconds.
2298        @param max_adv_interval_ms: max_adv_interval in milliseconds.
2299
2300        @returns: True if the error message is received correctly.
2301                  False otherwise.
2302
2303        """
2304        logging_timespan = self.compute_logging_timespan(max_adv_interval_ms)
2305        self._get_btmon_log(
2306                lambda: self.bluetooth_le_facade.register_advertisement(
2307                        advertisement_data),
2308                logging_timespan=logging_timespan)
2309
2310        # Verify that it failed to register advertisement due to the fact
2311        # that max advertisements are reached.
2312        failed_to_register_error = (self.ERROR_FAILED_TO_REGISTER_ADVERTISEMENT
2313                                    in self.advertising_msg)
2314
2315        # Verify that no new advertisement is added.
2316        advertisement_not_added = not self.bluetooth_le_facade.btmon_find(
2317                'Advertising Added:')
2318
2319        # Verify that the advertising intervals are correct.
2320        min_adv_interval_ms_found, max_adv_interval_ms_found = (
2321                self._verify_advertising_intervals(min_adv_interval_ms,
2322                                                   max_adv_interval_ms))
2323
2324        # Verify advertising remains enabled.
2325        advertising_enabled = self.bluetooth_le_facade.btmon_find(
2326                'Advertising: Enabled (0x01)')
2327
2328        self.results = {
2329                'failed_to_register_error': failed_to_register_error,
2330                'advertisement_not_added': advertisement_not_added,
2331                'min_adv_interval_ms_found': min_adv_interval_ms_found,
2332                'max_adv_interval_ms_found': max_adv_interval_ms_found,
2333                'advertising_enabled': advertising_enabled,
2334        }
2335        return all(self.results.values())
2336
2337
2338    @_test_retry_and_log(False)
2339    def test_unregister_advertisement(self, advertisement_data, instance_id,
2340                                      advertising_disabled):
2341        """Verify that an advertisement is unregistered correctly.
2342
2343        This test verifies the following data:
2344        - advertisement removed
2345        - advertising status: enabled if there are advertisements left;
2346                              disabled otherwise.
2347
2348        @param advertisement_data: the data of an advertisement to unregister.
2349        @param instance_id: the instance id of the advertisement to remove.
2350        @param advertising_disabled: is advertising disabled? This happens
2351                only when all advertisements are removed.
2352
2353        @returns: True if the advertisement is unregistered correctly.
2354                  False otherwise.
2355
2356        """
2357        self.count_advertisements -= 1
2358        self._get_btmon_log(
2359                lambda: self.bluetooth_le_facade.unregister_advertisement(
2360                        advertisement_data))
2361
2362        # Verify that the advertisement is removed.
2363        advertisement_removed = (
2364                self.bluetooth_le_facade.btmon_find('Advertising Removed') and
2365                self.bluetooth_le_facade.btmon_find('Instance: %d' %
2366                                                    instance_id))
2367
2368        # If advertising_disabled is True, there should be no log like
2369        #       'Advertising: Enabled (0x01)'
2370        # If advertising_disabled is False, there should be log like
2371        #       'Advertising: Enabled (0x01)'
2372
2373        # Only need to check advertising status when the last advertisement
2374        # is removed. For any other advertisements prior to the last one,
2375        # we may or may not observe 'Advertising: Enabled (0x01)' message.
2376        # Hence, the test would become flaky if we insist to see that message.
2377        # A possible workaround is to sleep for a while and then check the
2378        # message. The drawback is that we may need to wait up to 10 seconds
2379        # if the advertising duration and intervals are long.
2380        # In a test case, we always run test_check_duration_and_intervals()
2381        # to check if advertising duration and intervals are correct after
2382        # un-registering one or all advertisements, it is safe to do so.
2383        advertising_enabled_found = self.bluetooth_le_facade.btmon_find(
2384                'Advertising: Enabled (0x01)')
2385        advertising_disabled_found = self.bluetooth_le_facade.btmon_find(
2386                'Advertising: Disabled (0x00)')
2387        advertising_status_correct = not advertising_disabled or (
2388                advertising_disabled_found and not advertising_enabled_found)
2389
2390        self.results = {
2391                'advertisement_removed': advertisement_removed,
2392                'advertising_status_correct': advertising_status_correct,
2393        }
2394        return all(self.results.values())
2395
2396
2397    @_test_retry_and_log(False)
2398    def test_set_advertising_intervals(self, min_adv_interval_ms,
2399                                       max_adv_interval_ms):
2400        """Verify that new advertising intervals are set correctly.
2401
2402        Note that setting advertising intervals does not enable/disable
2403        advertising. Hence, there is no need to check the advertising
2404        status.
2405
2406        @param min_adv_interval_ms: the min advertising interval in ms.
2407        @param max_adv_interval_ms: the max advertising interval in ms.
2408
2409        @returns: True if the new advertising intervals are correct.
2410                  False otherwise.
2411
2412        """
2413        self._get_btmon_log(
2414                lambda: self.bluetooth_le_facade.set_advertising_intervals(
2415                        min_adv_interval_ms, max_adv_interval_ms))
2416
2417        # Verify the new advertising intervals.
2418        # With intervals of 200 ms and 200 ms, the log looks like
2419        #   bluetoothd: Set Advertising Intervals: 0x0140, 0x0140
2420        txt = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x'
2421        adv_intervals_found = self.bluetooth_le_facade.btmon_find(
2422                txt % (self.convert_to_adv_jiffies(min_adv_interval_ms),
2423                       self.convert_to_adv_jiffies(max_adv_interval_ms)))
2424
2425        self.results = {'adv_intervals_found': adv_intervals_found}
2426        return all(self.results.values())
2427
2428
2429    @_test_retry_and_log(False)
2430    def test_fail_to_set_advertising_intervals(
2431            self, invalid_min_adv_interval_ms, invalid_max_adv_interval_ms,
2432            orig_min_adv_interval_ms, orig_max_adv_interval_ms):
2433        """Verify that setting invalid advertising intervals results in error.
2434
2435        If invalid min/max advertising intervals are given, it would incur
2436        the error: 'org.bluez.Error.InvalidArguments: Invalid arguments'.
2437        Note that valid advertising intervals fall between 20 ms and 10,240 ms.
2438
2439        @param invalid_min_adv_interval_ms: the invalid min advertising interval
2440                in ms.
2441        @param invalid_max_adv_interval_ms: the invalid max advertising interval
2442                in ms.
2443        @param orig_min_adv_interval_ms: the original min advertising interval
2444                in ms.
2445        @param orig_max_adv_interval_ms: the original max advertising interval
2446                in ms.
2447
2448        @returns: True if it fails to set invalid advertising intervals.
2449                  False otherwise.
2450
2451        """
2452        self._get_btmon_log(
2453                lambda: self.bluetooth_le_facade.set_advertising_intervals(
2454                        invalid_min_adv_interval_ms,
2455                        invalid_max_adv_interval_ms))
2456
2457        # Verify that the invalid error is observed in the dbus error callback
2458        # message.
2459        invalid_intervals_error = (self.ERROR_INVALID_ADVERTISING_INTERVALS in
2460                                   self.advertising_msg)
2461
2462        # Verify that the min/max advertising intervals remain the same
2463        # after setting the invalid advertising intervals.
2464        #
2465        # In btmon log, we would see the following message first.
2466        #    bluetoothd: Set Advertising Intervals: 0x0010, 0x0010
2467        # And then, we should check if "Min advertising interval" and
2468        # "Max advertising interval" remain the same.
2469        start_str = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x' % (
2470                self.convert_to_adv_jiffies(invalid_min_adv_interval_ms),
2471                self.convert_to_adv_jiffies(invalid_max_adv_interval_ms))
2472
2473        search_strings = ['Min advertising interval:',
2474                          'Max advertising interval:']
2475        search_str = '|'.join(search_strings)
2476
2477        contents = self.bluetooth_le_facade.btmon_get(search_str=search_str,
2478                                                      start_str=start_str)
2479
2480        # The min/max advertising intervals of all advertisements should remain
2481        # the same as the previous valid ones.
2482        min_max_str = '[Min|Max] advertising interval: (\d*\.\d*) msec'
2483        min_max_pattern = re.compile(min_max_str)
2484        correct_orig_min_adv_interval = True
2485        correct_orig_max_adv_interval = True
2486        for line in contents:
2487            result = min_max_pattern.search(line)
2488            if result:
2489                interval = float(result.group(1))
2490                if 'Min' in line and interval != orig_min_adv_interval_ms:
2491                    correct_orig_min_adv_interval = False
2492                elif 'Max' in line and interval != orig_max_adv_interval_ms:
2493                    correct_orig_max_adv_interval = False
2494
2495        self.results = {
2496                'invalid_intervals_error': invalid_intervals_error,
2497                'correct_orig_min_adv_interval': correct_orig_min_adv_interval,
2498                'correct_orig_max_adv_interval': correct_orig_max_adv_interval}
2499
2500        return all(self.results.values())
2501
2502
2503    @_test_retry_and_log(False)
2504    def test_check_advertising_intervals(self, min_adv_interval_ms,
2505                                         max_adv_interval_ms):
2506        """Verify that the advertising intervals are as expected.
2507
2508        @param min_adv_interval_ms: the min advertising interval in ms.
2509        @param max_adv_interval_ms: the max advertising interval in ms.
2510
2511        @returns: True if the advertising intervals are correct.
2512                  False otherwise.
2513
2514        """
2515        self._get_btmon_log(None)
2516
2517        # Verify that the advertising intervals are correct.
2518        min_adv_interval_ms_found, max_adv_interval_ms_found = (
2519                self._verify_advertising_intervals(min_adv_interval_ms,
2520                                                   max_adv_interval_ms))
2521
2522        self.results = {
2523                'min_adv_interval_ms_found': min_adv_interval_ms_found,
2524                'max_adv_interval_ms_found': max_adv_interval_ms_found,
2525        }
2526        return all(self.results.values())
2527
2528
2529    @_test_retry_and_log(False)
2530    def test_reset_advertising(self, instance_ids=[]):
2531        """Verify that advertising is reset correctly.
2532
2533        Note that reset advertising would set advertising intervals to
2534        the default values. However, we would not be able to observe
2535        the values change until new advertisements are registered.
2536        Therefore, it is required that a test_register_advertisement()
2537        test is conducted after this test.
2538
2539        If instance_ids is [], all advertisements would still be removed
2540        if there are any. However, no need to check 'Advertising Removed'
2541        in btmon log since we may or may not be able to observe the message.
2542        This feature is needed if this test is invoked as the first one in
2543        a test case to reset advertising. In this situation, this test does
2544        not know how many advertisements exist.
2545
2546        @param instance_ids: the list of instance IDs that should be removed.
2547
2548        @returns: True if advertising is reset correctly.
2549                  False otherwise.
2550
2551        """
2552        self.count_advertisements = 0
2553        self._get_btmon_log(
2554                lambda: self.bluetooth_le_facade.reset_advertising())
2555
2556        # Verify that every advertisement is removed. When an advertisement
2557        # with instance id 1 is removed, the log looks like
2558        #   Advertising Removed
2559        #       instance: 1
2560        if len(instance_ids) > 0:
2561            advertisement_removed = self.bluetooth_le_facade.btmon_find(
2562                    'Advertising Removed')
2563            if advertisement_removed:
2564                for instance_id in instance_ids:
2565                    txt = 'Instance: %d' % instance_id
2566                    if not self.bluetooth_le_facade.btmon_find(txt):
2567                        advertisement_removed = False
2568                        break
2569        else:
2570            advertisement_removed = True
2571
2572        if not advertisement_removed:
2573            logging.error('Failed to remove advertisement')
2574
2575        # Verify that "Reset Advertising Intervals" command has been issued.
2576        reset_advertising_intervals = self.bluetooth_le_facade.btmon_find(
2577                'bluetoothd: Reset Advertising Intervals')
2578
2579        # Verify the advertising is disabled.
2580        advertising_disabled_observied = self.bluetooth_le_facade.btmon_find(
2581                'Advertising: Disabled')
2582        # If there are no existing advertisements, we may not observe
2583        # 'Advertising: Disabled'.
2584        advertising_disabled = (instance_ids == [] or
2585                                advertising_disabled_observied)
2586
2587        self.results = {
2588                'advertisement_removed': advertisement_removed,
2589                'reset_advertising_intervals': reset_advertising_intervals,
2590                'advertising_disabled': advertising_disabled,
2591        }
2592        return all(self.results.values())
2593
2594
2595    def add_device(self, address, address_type, action):
2596        """Add a device to the Kernel action list."""
2597        return self.bluetooth_facade.add_device(address, address_type, action)
2598
2599
2600    def remove_device(self, address, address_type):
2601        """Remove a device from the Kernel action list."""
2602        return self.bluetooth_facade.remove_device(address,address_type)
2603
2604
2605    def read_supported_commands(self):
2606        """Read the set of supported commands from the Kernel."""
2607        return self.bluetooth_facade.read_supported_commands()
2608
2609
2610    def read_info(self):
2611        """Read the adapter information from the Kernel."""
2612        return self.bluetooth_facade.read_info()
2613
2614
2615    def get_adapter_properties(self):
2616        """Read the adapter properties from the Bluetooth Daemon."""
2617        return self.bluetooth_facade.get_adapter_properties()
2618
2619
2620    def get_dev_info(self):
2621        """Read raw HCI device information."""
2622        return self.bluetooth_facade.get_dev_info()
2623
2624    def log_settings(self, msg, settings):
2625        """function convert MGMT_OP_READ_INFO settings to string
2626
2627        @param msg: string to include in output
2628        @param settings: bitstring returned by MGMT_OP_READ_INFO
2629        @return : List of strings indicating different settings
2630        """
2631        strs = []
2632        if settings & bluetooth_socket.MGMT_SETTING_POWERED:
2633            strs.append("POWERED")
2634        if settings & bluetooth_socket.MGMT_SETTING_CONNECTABLE:
2635            strs.append("CONNECTABLE")
2636        if settings & bluetooth_socket.MGMT_SETTING_FAST_CONNECTABLE:
2637            strs.append("FAST-CONNECTABLE")
2638        if settings & bluetooth_socket.MGMT_SETTING_DISCOVERABLE:
2639            strs.append("DISCOVERABLE")
2640        if settings & bluetooth_socket.MGMT_SETTING_PAIRABLE:
2641            strs.append("PAIRABLE")
2642        if settings & bluetooth_socket.MGMT_SETTING_LINK_SECURITY:
2643            strs.append("LINK-SECURITY")
2644        if settings & bluetooth_socket.MGMT_SETTING_SSP:
2645            strs.append("SSP")
2646        if settings & bluetooth_socket.MGMT_SETTING_BREDR:
2647            strs.append("BR/EDR")
2648        if settings & bluetooth_socket.MGMT_SETTING_HS:
2649            strs.append("HS")
2650        if settings & bluetooth_socket.MGMT_SETTING_LE:
2651            strs.append("LE")
2652        logging.debug('%s : %s', msg, " ".join(strs))
2653        return strs
2654
2655    def log_flags(self, msg, flags):
2656        """Function to convert HCI state configuration to a string
2657
2658        @param msg: string to include in output
2659        @param settings: bitstring returned by get_dev_info
2660        @return : List of strings indicating different flags
2661        """
2662        strs = []
2663        if flags & bluetooth_socket.HCI_UP:
2664            strs.append("UP")
2665        else:
2666            strs.append("DOWN")
2667        if flags & bluetooth_socket.HCI_INIT:
2668            strs.append("INIT")
2669        if flags & bluetooth_socket.HCI_RUNNING:
2670            strs.append("RUNNING")
2671        if flags & bluetooth_socket.HCI_PSCAN:
2672            strs.append("PSCAN")
2673        if flags & bluetooth_socket.HCI_ISCAN:
2674            strs.append("ISCAN")
2675        if flags & bluetooth_socket.HCI_AUTH:
2676            strs.append("AUTH")
2677        if flags & bluetooth_socket.HCI_ENCRYPT:
2678            strs.append("ENCRYPT")
2679        if flags & bluetooth_socket.HCI_INQUIRY:
2680            strs.append("INQUIRY")
2681        if flags & bluetooth_socket.HCI_RAW:
2682            strs.append("RAW")
2683        logging.debug('%s [HCI]: %s', msg, " ".join(strs))
2684        return strs
2685
2686
2687    @_test_retry_and_log(False)
2688    def test_service_resolved(self, address):
2689        """Test that the services under device address can be resolved
2690
2691        @param address: MAC address of a device
2692
2693        @returns: True if the ServicesResolved property is changed before
2694                 timeout, False otherwise.
2695
2696        """
2697        is_resolved_func = self.bluetooth_facade.device_services_resolved
2698        return self._wait_for_condition(lambda : is_resolved_func(address),\
2699                                        method_name())
2700
2701
2702    @_test_retry_and_log(False)
2703    def test_gatt_browse(self, address):
2704        """Test that the GATT client can get the attributes correctly
2705
2706        @param address: MAC address of a device
2707
2708        @returns: True if the attribute map received by GATT client is the same
2709                  as expected. False otherwise.
2710
2711        """
2712
2713        gatt_client_facade = GATT_ClientFacade(self.bluetooth_facade)
2714        actual_app = gatt_client_facade.browse(address)
2715        expected_app = GATT_HIDApplication()
2716        diff = GATT_Application.diff(actual_app, expected_app)
2717
2718        self.result = {
2719            'actural_result': actual_app,
2720            'expected_result': expected_app
2721        }
2722
2723        gatt_attribute_hierarchy = ['Device', 'Service', 'Characteristic',
2724                                    'Descriptor']
2725        # Remove any difference in object path
2726        for parent, child in zip(gatt_attribute_hierarchy,
2727                                 gatt_attribute_hierarchy[1:]):
2728            pattern = re.compile('^%s .* is different in %s' % (child, parent))
2729            for diff_str in diff[::]:
2730                if pattern.search(diff_str):
2731                    diff.remove(diff_str)
2732
2733        if len(diff) != 0:
2734            logging.error('Application Diff: %s', diff)
2735            return False
2736        return True
2737
2738
2739    # -------------------------------------------------------------------
2740    # Bluetooth mouse related tests
2741    # -------------------------------------------------------------------
2742
2743
2744    def _record_input_events(self, device, gesture):
2745        """Record the input events.
2746
2747        @param device: the bluetooth HID device.
2748        @param gesture: the gesture method to perform.
2749
2750        @returns: the input events received on the DUT.
2751
2752        """
2753        self.input_facade.initialize_input_recorder(device.name)
2754        self.input_facade.start_input_recorder()
2755        time.sleep(self.HID_REPORT_SLEEP_SECS)
2756        gesture()
2757        time.sleep(self.HID_REPORT_SLEEP_SECS)
2758        self.input_facade.stop_input_recorder()
2759        time.sleep(self.HID_REPORT_SLEEP_SECS)
2760        event_values = self.input_facade.get_input_events()
2761        events = [Event(*ev) for ev in event_values]
2762        return events
2763
2764
2765    def _test_mouse_click(self, device, button):
2766        """Test that the mouse click events could be received correctly.
2767
2768        @param device: the meta device containing a bluetooth HID device
2769        @param button: which button to test, 'LEFT' or 'RIGHT'
2770
2771        @returns: True if the report received by the host matches the
2772                  expected one. False otherwise.
2773
2774        """
2775        if button == 'LEFT':
2776            gesture = device.LeftClick
2777        elif button == 'RIGHT':
2778            gesture = device.RightClick
2779        else:
2780            raise error.TestError('Button (%s) is not valid.' % button)
2781
2782        actual_events = self._record_input_events(device, gesture)
2783
2784        linux_input_button = {'LEFT': BTN_LEFT, 'RIGHT': BTN_RIGHT}
2785        expected_events = [
2786                # Button down
2787                recorder.MSC_SCAN_BTN_EVENT[button],
2788                Event(EV_KEY, linux_input_button[button], 1),
2789                recorder.SYN_EVENT,
2790                # Button up
2791                recorder.MSC_SCAN_BTN_EVENT[button],
2792                Event(EV_KEY, linux_input_button[button], 0),
2793                recorder.SYN_EVENT]
2794
2795        self.results = {
2796                'actual_events': map(str, actual_events),
2797                'expected_events': map(str, expected_events)}
2798        return actual_events == expected_events
2799
2800
2801    @_test_retry_and_log
2802    def test_mouse_left_click(self, device):
2803        """Test that the mouse left click events could be received correctly.
2804
2805        @param device: the meta device containing a bluetooth HID device
2806
2807        @returns: True if the report received by the host matches the
2808                  expected one. False otherwise.
2809
2810        """
2811        return self._test_mouse_click(device, 'LEFT')
2812
2813
2814    @_test_retry_and_log
2815    def test_mouse_right_click(self, device):
2816        """Test that the mouse right click events could be received correctly.
2817
2818        @param device: the meta device containing a bluetooth HID device
2819
2820        @returns: True if the report received by the host matches the
2821                  expected one. False otherwise.
2822
2823        """
2824        return self._test_mouse_click(device, 'RIGHT')
2825
2826
2827    def _test_mouse_move(self, device, delta_x=0, delta_y=0):
2828        """Test that the mouse move events could be received correctly.
2829
2830        @param device: the meta device containing a bluetooth HID device
2831        @param delta_x: the distance to move cursor in x axis
2832        @param delta_y: the distance to move cursor in y axis
2833
2834        @returns: True if the report received by the host matches the
2835                  expected one. False otherwise.
2836
2837        """
2838        gesture = lambda: device.Move(delta_x, delta_y)
2839        actual_events = self._record_input_events(device, gesture)
2840
2841        events_x = [Event(EV_REL, REL_X, delta_x)] if delta_x else []
2842        events_y = [Event(EV_REL, REL_Y, delta_y)] if delta_y else []
2843        expected_events = events_x + events_y + [recorder.SYN_EVENT]
2844
2845        self.results = {
2846                'actual_events': map(str, actual_events),
2847                'expected_events': map(str, expected_events)}
2848        return actual_events == expected_events
2849
2850
2851    @_test_retry_and_log
2852    def test_mouse_move_in_x(self, device, delta_x):
2853        """Test that the mouse move events in x could be received correctly.
2854
2855        @param device: the meta device containing a bluetooth HID device
2856        @param delta_x: the distance to move cursor in x axis
2857
2858        @returns: True if the report received by the host matches the
2859                  expected one. False otherwise.
2860
2861        """
2862        return self._test_mouse_move(device, delta_x=delta_x)
2863
2864
2865    @_test_retry_and_log
2866    def test_mouse_move_in_y(self, device, delta_y):
2867        """Test that the mouse move events in y could be received correctly.
2868
2869        @param device: the meta device containing a bluetooth HID device
2870        @param delta_y: the distance to move cursor in y axis
2871
2872        @returns: True if the report received by the host matches the
2873                  expected one. False otherwise.
2874
2875        """
2876        return self._test_mouse_move(device, delta_y=delta_y)
2877
2878
2879    @_test_retry_and_log
2880    def test_mouse_move_in_xy(self, device, delta_x, delta_y):
2881        """Test that the mouse move events could be received correctly.
2882
2883        @param device: the meta device containing a bluetooth HID device
2884        @param delta_x: the distance to move cursor in x axis
2885        @param delta_y: the distance to move cursor in y axis
2886
2887        @returns: True if the report received by the host matches the
2888                  expected one. False otherwise.
2889
2890        """
2891        return self._test_mouse_move(device, delta_x=delta_x, delta_y=delta_y)
2892
2893
2894    def _test_mouse_scroll(self, device, units):
2895        """Test that the mouse wheel events could be received correctly.
2896
2897        @param device: the meta device containing a bluetooth HID device
2898        @param units: the units to scroll in y axis
2899
2900        @returns: True if the report received by the host matches the
2901                  expected one. False otherwise.
2902
2903        """
2904        gesture = lambda: device.Scroll(units)
2905        actual_events = self._record_input_events(device, gesture)
2906        expected_events = [Event(EV_REL, REL_WHEEL, units), recorder.SYN_EVENT]
2907        self.results = {
2908                'actual_events': map(str, actual_events),
2909                'expected_events': map(str, expected_events)}
2910        return actual_events == expected_events
2911
2912
2913    @_test_retry_and_log
2914    def test_mouse_scroll_down(self, device, delta_y):
2915        """Test that the mouse wheel events could be received correctly.
2916
2917        @param device: the meta device containing a bluetooth HID device
2918        @param delta_y: the units to scroll down in y axis;
2919                        should be a postive value
2920
2921        @returns: True if the report received by the host matches the
2922                  expected one. False otherwise.
2923
2924        """
2925        if delta_y > 0:
2926            return self._test_mouse_scroll(device, delta_y)
2927        else:
2928            raise error.TestError('delta_y (%d) should be a positive value',
2929                                  delta_y)
2930
2931
2932    @_test_retry_and_log
2933    def test_mouse_scroll_up(self, device, delta_y):
2934        """Test that the mouse wheel events could be received correctly.
2935
2936        @param device: the meta device containing a bluetooth HID device
2937        @param delta_y: the units to scroll up in y axis;
2938                        should be a postive value
2939
2940        @returns: True if the report received by the host matches the
2941                  expected one. False otherwise.
2942
2943        """
2944        if delta_y > 0:
2945            return self._test_mouse_scroll(device, -delta_y)
2946        else:
2947            raise error.TestError('delta_y (%d) should be a positive value',
2948                                  delta_y)
2949
2950
2951    @_test_retry_and_log
2952    def test_mouse_click_and_drag(self, device, delta_x, delta_y):
2953        """Test that the mouse click-and-drag events could be received
2954        correctly.
2955
2956        @param device: the meta device containing a bluetooth HID device
2957        @param delta_x: the distance to drag in x axis
2958        @param delta_y: the distance to drag in y axis
2959
2960        @returns: True if the report received by the host matches the
2961                  expected one. False otherwise.
2962
2963        """
2964        gesture = lambda: device.ClickAndDrag(delta_x, delta_y)
2965        actual_events = self._record_input_events(device, gesture)
2966
2967        button = 'LEFT'
2968        expected_events = (
2969                [# Button down
2970                 recorder.MSC_SCAN_BTN_EVENT[button],
2971                 Event(EV_KEY, BTN_LEFT, 1),
2972                 recorder.SYN_EVENT] +
2973                # cursor movement in x and y
2974                ([Event(EV_REL, REL_X, delta_x)] if delta_x else []) +
2975                ([Event(EV_REL, REL_Y, delta_y)] if delta_y else []) +
2976                [recorder.SYN_EVENT] +
2977                # Button up
2978                [recorder.MSC_SCAN_BTN_EVENT[button],
2979                 Event(EV_KEY, BTN_LEFT, 0),
2980                 recorder.SYN_EVENT])
2981
2982        self.results = {
2983                'actual_events': map(str, actual_events),
2984                'expected_events': map(str, expected_events)}
2985        return actual_events == expected_events
2986
2987
2988    # -------------------------------------------------------------------
2989    # Bluetooth keyboard related tests
2990    # -------------------------------------------------------------------
2991
2992    # TODO may be deprecated as stated in b:140515628
2993    @_test_retry_and_log
2994    def test_keyboard_input_from_string(self, device, string_to_send):
2995        """Test that the keyboard's key events could be received correctly.
2996
2997        @param device: the meta device containing a bluetooth HID device
2998        @param string_to_send: the set of keys that will be pressed one-by-one
2999
3000        @returns: True if the report received by the host matches the
3001                  expected one. False otherwise.
3002
3003        """
3004
3005        gesture = lambda: device.KeyboardSendString(string_to_send)
3006
3007        actual_events = self._record_input_events(device, gesture)
3008
3009        resulting_string = bluetooth_test_utils.reconstruct_string(
3010                           actual_events)
3011
3012        return string_to_send == resulting_string
3013
3014
3015    @_test_retry_and_log
3016    def test_keyboard_input_from_trace(self, device, trace_name):
3017        """ Tests that keyboard events can be transmitted and received correctly
3018
3019        @param device: the meta device containing a bluetooth HID device
3020        @param trace_name: string name for keyboard activity trace to be used
3021                           in the test i.e. "simple_text"
3022
3023        @returns: true if the recorded output matches the expected output
3024                  false otherwise
3025        """
3026
3027        # Read data from trace I/O files
3028        input_trace = bluetooth_test_utils.parse_trace_file(os.path.join(
3029                      TRACE_LOCATION, '{}_input.txt'.format(trace_name)))
3030        output_trace = bluetooth_test_utils.parse_trace_file(os.path.join(
3031                      TRACE_LOCATION, '{}_output.txt'.format(trace_name)))
3032
3033        if not input_trace or not output_trace:
3034            logging.error('Failure in using trace')
3035            return False
3036
3037        # Disregard timing data for now
3038        input_scan_codes = [tup[1] for tup in input_trace]
3039        predicted_events = [Event(*tup[1]) for tup in output_trace]
3040
3041        # Create and run this trace as a gesture
3042        gesture = lambda: device.KeyboardSendTrace(input_scan_codes)
3043        rec_events = self._record_input_events(device, gesture)
3044
3045        # Filter out any input events that were not from the keyboard
3046        rec_key_events = [ev for ev in rec_events if ev.type == EV_KEY]
3047
3048        # Fail if we didn't record the correct number of events
3049        if len(rec_key_events) != len(input_scan_codes):
3050            return False
3051
3052        for idx, predicted in enumerate(predicted_events):
3053            recorded = rec_key_events[idx]
3054
3055            if not predicted == recorded:
3056                return False
3057
3058        return True
3059
3060
3061    def is_newer_kernel_version(self, version, minimum_version):
3062        """ Check if given kernel version is newer than unsupported version."""
3063
3064        return utils.compare_versions(version, minimum_version) >= 0
3065
3066
3067    def is_supported_kernel_version(self, kernel_version, minimum_version,
3068                                    msg=None):
3069        """ Check if kernel version is greater than minimum version.
3070
3071            Check if given kernel version is greater than or equal to minimum
3072            version. Raise TEST_NA if given kernel version is lower than the
3073            minimum version.
3074
3075            Note: Kernel version may have suffixes, so ensure that minimum
3076            version should be the smallest version that is permissible.
3077            Ex: If minimum version is 3.8.11 then 3.8.11-<random> will
3078            pass the check.
3079
3080            @param kernel_version: kernel version to be checked as a string
3081            @param: minimum_version: minimum kernel version requried
3082
3083            @returns: None
3084
3085            @raises: TEST_NA if kernel version is not greater than the minimum
3086                     version
3087        """
3088
3089        logging.debug('kernel version is {} minimum version'
3090                      'is {}'.format(kernel_version,minimum_version))
3091
3092        if msg is None:
3093            msg = 'Test not supported on this kernel version'
3094
3095        if not self.is_newer_kernel_version(kernel_version, minimum_version):
3096            logging.debug('Kernel version check failed. Exiting the test')
3097            raise error.TestNAError(msg)
3098
3099        logging.debug('Kernel version check passed')
3100
3101
3102    # -------------------------------------------------------------------
3103    # Servod related tests
3104    # -------------------------------------------------------------------
3105
3106    @_test_retry_and_log
3107    def test_power_consumption(self, max_power_mw):
3108        """Test the average power consumption."""
3109        power_mw = self.device.servod.MeasurePowerConsumption()
3110        self.results = {'power_mw': power_mw}
3111
3112        if (power_mw is None):
3113            logging.error('Failed to measure power consumption')
3114            return False
3115
3116        power_mw = float(power_mw)
3117        logging.info('power consumption (mw): %f (max allowed: %f)',
3118                     power_mw, max_power_mw)
3119
3120        return power_mw <= max_power_mw
3121
3122
3123    @_test_retry_and_log
3124    def test_start_notify(self, address, uuid, cccd_value):
3125        """Test that a notification can be started on a characteristic
3126
3127        @param address: The MAC address of the remote device.
3128        @param uuid: The uuid of the characteristic.
3129        @param cccd_value: Possible CCCD values include
3130               0x00 - inferred from the remote characteristic's properties
3131               0x01 - notification
3132               0x02 - indication
3133
3134        @returns: The test results.
3135
3136        """
3137        start_notify = self.bluetooth_facade.start_notify(
3138            address, uuid, cccd_value)
3139        is_notifying = self._wait_for_condition(
3140            lambda: self.bluetooth_facade.is_notifying(
3141                address, uuid), method_name())
3142
3143        self.results = {
3144            'start_notify': start_notify,
3145            'is_notifying': is_notifying}
3146
3147        return all(self.results.values())
3148
3149
3150    @_test_retry_and_log
3151    def test_stop_notify(self, address, uuid):
3152        """Test that a notification can be stopped on a characteristic
3153
3154        @param address: The MAC address of the remote device.
3155        @param uuid: The uuid of the characteristic.
3156
3157        @returns: The test results.
3158
3159        """
3160        stop_notify = self.bluetooth_facade.stop_notify(address, uuid)
3161        is_not_notifying = self._wait_for_condition(
3162            lambda: not self.bluetooth_facade.is_notifying(
3163                address, uuid), method_name())
3164
3165        self.results = {
3166            'stop_notify': stop_notify,
3167            'is_not_notifying': is_not_notifying}
3168
3169        return all(self.results.values())
3170
3171
3172    # -------------------------------------------------------------------
3173    # Autotest methods
3174    # -------------------------------------------------------------------
3175
3176
3177    def initialize(self):
3178        """Initialize bluetooth adapter tests."""
3179        # Run through every tests and collect failed tests in self.fails.
3180        self.fails = []
3181
3182        # If a test depends on multiple conditions, write the results of
3183        # the conditions in self.results so that it is easy to know
3184        # what conditions failed by looking at the log.
3185        self.results = None
3186
3187        # Some tests may instantiate a peripheral device for testing.
3188        self.devices = dict()
3189        for device_type in SUPPORTED_DEVICE_TYPES:
3190            self.devices[device_type] = list()
3191
3192        # The count of registered advertisements.
3193        self.count_advertisements = 0
3194
3195
3196    def check_chameleon(self):
3197        """Check the existence of chameleon_host.
3198
3199        The chameleon_host is specified in --args as follows
3200
3201        (cr) $ test_that --args "chameleon_host=$CHAMELEON_IP" "$DUT_IP" <test>
3202
3203        """
3204        logging.debug('labels: %s', self.host.get_labels())
3205        if self.host.chameleon is None:
3206            raise error.TestError('Have to specify chameleon_host IP.')
3207
3208
3209    def run_once(self, *args, **kwargs):
3210        """This method should be implemented by children classes.
3211
3212        Typically, the run_once() method would look like:
3213
3214        factory = remote_facade_factory.RemoteFacadeFactory(host)
3215        self.bluetooth_facade = factory.create_bluetooth_hid_facade()
3216
3217        self.test_bluetoothd_running()
3218        # ...
3219        # invoke more self.test_xxx() tests.
3220        # ...
3221
3222        if self.fails:
3223            raise error.TestFail(self.fails)
3224
3225        """
3226        raise NotImplementedError
3227
3228
3229    def cleanup(self, test_state='END'):
3230        """Clean up bluetooth adapter tests.
3231
3232        @param test_state: string describing the requested clear is for
3233                           a new test(NEW), the middle of the test(MID),
3234                           or the end of the test(END).
3235        """
3236
3237        if test_state is 'END':
3238            # Disable all the bluetooth debug logs
3239            self.enable_disable_debug_log(enable=False)
3240
3241            if hasattr(self, 'host'):
3242                # Stop btmon process
3243                self.host.run('pkill btmon || true')
3244
3245        # Close the device properly if a device is instantiated.
3246        # Note: do not write something like the following statements
3247        #           if self.devices[device_type]:
3248        #       or
3249        #           if bool(self.devices[device_type]):
3250        #       Otherwise, it would try to invoke bluetooth_mouse.__nonzero__()
3251        #       which just does not exist.
3252        for device_name, device_list  in self.devices.items():
3253            for device in device_list:
3254                if device is not None:
3255                    device.Close()
3256
3257                    # Power cycle BT device if we're in the middle of a test
3258                    if test_state is 'MID':
3259                        device.PowerCycle()
3260
3261        self.devices = dict()
3262        for device_type in SUPPORTED_DEVICE_TYPES:
3263            self.devices[device_type] = list()
3264