• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright 2016 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Server side bluetooth adapter subtests."""
7
8from __future__ import absolute_import
9from __future__ import division
10from __future__ import print_function
11
12from datetime import datetime, timedelta
13import errno
14import functools
15import six.moves.http_client
16import inspect
17import logging
18import multiprocessing
19import os
20import re
21import socket
22import threading
23import time
24
25import common
26from autotest_lib.client.bin import utils
27from autotest_lib.client.bin.input import input_event_recorder as recorder
28from autotest_lib.client.common_lib import error
29from autotest_lib.client.common_lib.cros.bluetooth import bluetooth_socket
30from autotest_lib.client.cros.chameleon import chameleon
31from autotest_lib.server.cros.bluetooth import bluetooth_test_utils
32from autotest_lib.server import test
33
34from autotest_lib.client.bin.input.linux_input import (
35        BTN_LEFT, BTN_RIGHT, EV_KEY, EV_REL, REL_X, REL_Y, REL_WHEEL,
36        REL_WHEEL_HI_RES, KEY_PLAYCD, KEY_PAUSECD, KEY_STOPCD, KEY_NEXTSONG,
37        KEY_PREVIOUSSONG)
38from autotest_lib.server.cros.bluetooth.bluetooth_gatt_client_utils import (
39        GATT_ClientFacade, GATT_Application, GATT_HIDApplication)
40from autotest_lib.server.cros.multimedia import remote_facade_factory
41import six
42from six.moves import map
43from six.moves import range
44from six.moves import zip
45
46
47Event = recorder.Event
48
49# We have a number of chipsets that are no longer supported. Known issues
50# related to firmware will be ignored on these devices (b/169328792).
51UNSUPPORTED_CHIPSETS = ['MVL-8897', 'MVL-8997', 'Intel-AC7260', 'Intel-AC7265']
52
53# Location of data traces relative to this (bluetooth_adapter_tests.py) file
54BT_ADAPTER_TEST_PATH = os.path.dirname(__file__)
55TRACE_LOCATION = os.path.join(BT_ADAPTER_TEST_PATH, 'input_traces/keyboard')
56
57RESUME_DELTA = -5
58
59# Delay binding the methods since host is only available at run time.
60SUPPORTED_DEVICE_TYPES = {
61        'MOUSE': lambda btpeer: btpeer.get_bluetooth_hid_mouse,
62        'KEYBOARD': lambda btpeer: btpeer.get_bluetooth_hid_keyboard,
63        'BLE_MOUSE': lambda btpeer: btpeer.get_ble_mouse,
64        'BLE_KEYBOARD': lambda btpeer: btpeer.get_ble_keyboard,
65        # Tester allows us to test DUT's discoverability, etc. from a peer
66        'BLUETOOTH_TESTER': lambda btpeer: btpeer.get_bluetooth_tester,
67        # This is a base object that does not emulate any Bluetooth device.
68        # This object is preferred when only a pure XMLRPC server is needed
69        # on the btpeer host, e.g., to perform servod methods.
70        'BLUETOOTH_BASE': lambda btpeer: btpeer.get_bluetooth_base,
71        # on the chameleon host, e.g., to perform servod methods.
72        'BLUETOOTH_BASE': lambda chameleon: chameleon.get_bluetooth_base,
73        # A phone device that supports Bluetooth
74        'BLE_PHONE': lambda chameleon: chameleon.get_ble_phone,
75        # A Bluetooth audio device emulating a headphone
76        'BLUETOOTH_AUDIO': lambda chameleon: chameleon.get_bluetooth_audio,
77        # A Bluetooth device that implements the Fast Pair protocol.
78        'BLE_FAST_PAIR': lambda chameleon: chameleon.get_ble_fast_pair,
79}
80
81COMMON_FAILURES = {
82        'Freeing adapter /org/bluez/hci': 'adapter_freed',
83        '/var/spool/crash/bluetoothd': 'bluetoothd_crashed',
84        'btintel_hw_error': 'intel hardware error detected',
85        'qca_hw_error': 'qca hardware error detected',
86        'cmd_cnt 0 cmd queued ([5-9]|[1-9][0-9]+)': 'controller cmd capacity',
87}
88
89# TODO(b/150898182) - Don't run some tests on tablet form factors
90# This list was generated by looking for tablet models on Goldeneye and removing
91# the ones that were not launched
92TABLET_MODELS = ['kakadu', 'kodama', 'krane', 'dru', 'druwl', 'dumo']
93
94# Some platforms do not have built-in I/O hardware, and so they are configured
95# to automatically reconnect to paired HID devices on boot. We note these
96# platform types here as there will be different behavior expectations around
97# reboot.
98RECONNECT_PLATFORM_TYPES = ['CHROMEBOX', 'CHROMEBIT', 'CHROMEBASE']
99
100# TODO(b/158336394) Realtek: Powers down during suspend due to high power usage
101#                            during S3.
102# TODO(b/168152910) Marvell: Powers down during suspend due to flakiness when
103#                            entering suspend.  This will also skip the tests
104#                            for Veyron (which don't power down right now) but
105#                            reconnect tests are still enabled for that platform
106#                            to check for suspend stability.
107SUSPEND_POWER_DOWN_CHIPSETS = ['Realtek-RTL8822C-USB', 'MVL-8897', 'MVL-8997']
108
109# All realtek chipsets on USB will drop its firmware and reload on
110# suspend-resume unless it is connected to a peer device. This doesn't
111# include RTL8822, which would reset regardless of the peer.
112SUSPEND_RESET_IF_NO_PEER_CHIPSETS = ['Realtek-RTL8852A-USB']
113
114# Models to skip since they power down on suspend.
115SUSPEND_POWER_DOWN_MODELS = ['dru', 'druwl', 'dumo']
116
117# Chipsets which do not support Bluetooth Hardware Filtering.
118UNSUPPORTED_BT_HW_FILTERING_CHIPSETS = [
119        'MVL-8897', 'MVL-8997', 'QCA-6174A-5-USB', 'QCA-6174A-3-UART',
120        'QCA-WCN6856', 'Intel-AC7260', 'Intel-AC7265', 'Realtek-RTL8822C-USB',
121        'Realtek-RTL8822C-UART', 'Realtek-RTL8852A-USB',
122        'Mediatek-MTK7921-USB', 'Mediatek-MTK7921-SDIO'
123]
124
125KERNEL_LOG_LEVEL = {
126        'EMERG': 0,
127        'ALERT': 1,
128        'CRIT': 2,
129        'ERR': 3,
130        'WARNING': 4,
131        'NOTICE': 5,
132        'INFO': 6,
133        'DEBUG': 7
134}
135
136# The benchmark criterion to determine whether HID device reconnection is fast
137HID_RECONNECT_TIME_MAX_SEC = 3
138LE_HID_RECONNECT_TIME_MAX_SEC = 3
139
140
141def method_name():
142    """Get the method name of a class.
143
144    This function is supposed to be invoked inside a class and will
145    return current method name who invokes this function.
146
147    @returns: the string of the method name inside the class.
148    """
149    return inspect.getouterframes(inspect.currentframe())[1][3]
150
151
152def _run_method(method, method_name, *args, **kwargs):
153    """Run a target method and capture exceptions if any.
154
155    This is just a wrapper of the target method so that we do not need to
156    write the exception capturing structure repeatedly. The method could
157    be either a device method or a facade method.
158
159    @param method: the method to run
160    @param method_name: the name of the method
161
162    @returns: the return value of target method() if successful.
163              False otherwise.
164
165    """
166    result = False
167    try:
168        result = method(*args, **kwargs)
169    except Exception as e:
170        logging.error('%s: %s', method_name, e)
171    except:
172        logging.error('%s: unexpected error', method_name)
173    return result
174
175
176def get_bluetooth_emulated_device(btpeer, device_type):
177    """Get the bluetooth emulated device object.
178
179    @param btpeer: the Bluetooth peer device
180    @param device_type : the bluetooth device type, e.g., 'MOUSE'
181
182    @returns: the bluetooth device object
183
184    """
185
186    def _retry_device_method(method_name, legal_falsy_values=[]):
187        """retry the emulated device's method.
188
189        The method is invoked as device.xxxx() e.g., device.GetAdvertisedName().
190
191        Note that the method name string is provided to get the device's actual
192        method object at run time through getattr(). The rebinding is required
193        because a new device may have been created previously or during the
194        execution of fix_serial_device().
195
196        Given a device's method, it is not feasible to get the method name
197        through __name__ attribute. This limitation is due to the fact that
198        the device is a dotted object of an XML RPC server proxy.
199        As an example, with the method name 'GetAdvertisedName', we could
200        derive the correspoinding method device.GetAdvertisedName. On the
201        contrary, given device.GetAdvertisedName, it is not feasible to get the
202        method name by device.GetAdvertisedName.__name__
203
204        Also note that if the device method fails, we would try remediation
205        step and retry the device method. The remediation steps are
206         1) re-creating the serial device.
207         2) reset (powercycle) the bluetooth dongle.
208         3) reboot Bluetooth peer.
209        If the device method still fails after these steps, we fail the test
210
211        The default values exist for uses of this function before the options
212        were added, ideally we should change zero_ok to False.
213
214        @param method_name: the string of the method name.
215        @param legal_falsy_values: Values that are falsy but might be OK.
216
217        @returns: the result returned by the device's method if the call was
218                  successful
219
220        @raises: TestError if the devices's method fails or if repair of
221                 peripheral kit fails
222
223        """
224
225        action_table = [('recreate' , 'Fixing the serial device'),
226                        ('reset', 'Power cycle the peer device'),
227                        ('reboot', 'Reboot the chamleond host')]
228
229        for i, (action, description) in enumerate(action_table):
230            logging.info('Attempt %s : %s ', i+1, method_name)
231
232            result = _run_method(getattr(device, method_name), method_name)
233            if _is_successful(result, legal_falsy_values):
234                return result
235
236            logging.error('%s failed the %s time. Attempting to %s',
237                          method_name,i,description)
238            if not fix_serial_device(btpeer, device, action):
239                logging.info('%s failed', description)
240            else:
241                logging.info('%s successful', description)
242
243        #try it last time after fix it by last action
244        result = _run_method(getattr(device, method_name), method_name)
245        if _is_successful(result, legal_falsy_values):
246            return result
247
248        raise error.TestError('Failed to execute %s. Bluetooth peer device is'
249                              'not working' % method_name)
250
251
252    if device_type not in SUPPORTED_DEVICE_TYPES:
253        raise error.TestError('The device type is not supported: %s',
254                              device_type)
255
256    # Get the bluetooth device object and query some important properties.
257    device = SUPPORTED_DEVICE_TYPES[device_type](btpeer)()
258
259    # Get some properties of the kit
260    # NOTE: Strings updated here must be kept in sync with Btpeer.
261    device._capabilities = _retry_device_method('GetCapabilities')
262    device._transports = device._capabilities["CAP_TRANSPORTS"]
263    device._is_le_only = ("TRANSPORT_LE" in device._transports and
264                          len(device._transports) == 1)
265    device._has_pin = device._capabilities["CAP_HAS_PIN"]
266    device.can_init_connection = device._capabilities["CAP_INIT_CONNECT"]
267
268    _retry_device_method('Init')
269    logging.info('device type: %s', device_type)
270
271    device.name = _retry_device_method('GetAdvertisedName')
272    logging.info('device name: %s', device.name)
273
274    device.address = _retry_device_method('GetLocalBluetoothAddress')
275    logging.info('address: %s', device.address)
276
277    pin_falsy_values = [] if device._has_pin else [None]
278    device.pin = _retry_device_method('GetPinCode', pin_falsy_values)
279    logging.info('pin: %s', device.pin)
280
281    class_falsy_values = [None] if device._is_le_only else [0]
282
283    # Class of service is None for LE-only devices. Don't fail or parse it.
284    device.class_of_service = _retry_device_method('GetClassOfService',
285                                                   class_falsy_values)
286    if device._is_le_only:
287        parsed_class_of_service = device.class_of_service
288    else:
289        parsed_class_of_service = "0x%04X" % device.class_of_service
290    logging.info('class of service: %s', parsed_class_of_service)
291
292    device.class_of_device = _retry_device_method('GetClassOfDevice',
293                                                  class_falsy_values)
294    # Class of device is None for LE-only devices. Don't fail or parse it.
295    if device._is_le_only:
296        parsed_class_of_device = device.class_of_device
297    else:
298        parsed_class_of_device = "0x%04X" % device.class_of_device
299    logging.info('class of device: %s', parsed_class_of_device)
300
301    device.device_type = _retry_device_method('GetDeviceType')
302    logging.info('device type: %s', device.device_type)
303
304    device.authentication_mode = None
305    if not device._is_le_only:
306        device.authentication_mode = _retry_device_method('GetAuthenticationMode')
307        logging.info('authentication mode: %s', device.authentication_mode)
308
309    device.port = _retry_device_method('GetPort')
310    logging.info('serial port: %s\n', device.port)
311
312    return device
313
314
315def recreate_serial_device(device):
316    """Create and connect to a new serial device.
317
318    @param device: the bluetooth HID device
319
320    @returns: True if the serial device is re-created successfully.
321
322    """
323    logging.info('Remove the old serial device and create a new one.')
324    if device is not None:
325        try:
326            device.Close()
327        except:
328            logging.error('failed to close the serial device.')
329            return False
330    try:
331        device.CreateSerialDevice()
332        return True
333    except:
334        logging.error('failed to invoke CreateSerialDevice.')
335        return False
336
337
338def _check_device_init(device, operation):
339    # Check if the serial device could initialize, connect, and
340    # enter command mode correctly.
341    logging.info('Checking device status...')
342    if not _run_method(device.Init, 'Init'):
343        logging.info('device.Init: failed after %s', operation)
344        return False
345    if not device.CheckSerialConnection():
346        logging.info('device.CheckSerialConnection: failed after %s', operation)
347        return False
348    if not _run_method(device.EnterCommandMode, 'EnterCommandMode'):
349        logging.info('device.EnterCommandMode: failed after %s', operation)
350        return False
351    logging.info('The device is created successfully after %s.', operation)
352    return True
353
354def _reboot_btpeer(btpeer, device):
355    """ Reboot Bluetooth peer device.
356
357    Also power cycle the device since reboot may not do that.."""
358
359    # Chameleond fizz hosts should have write protect removed and
360    # set_gbb_flags set to 0 to minimize boot time
361    REBOOT_SLEEP_SECS = 10
362    RESET_SLEEP_SECS = 1
363
364    # Close the bluetooth peripheral device and reboot the chameleon board.
365    device.Close()
366    logging.info("Powercycling the device")
367    device.PowerCycle()
368    time.sleep(RESET_SLEEP_SECS)
369    logging.info('rebooting Bluetooth peer...')
370    btpeer.reboot()
371
372    # Every btpeer reboot would take a bit more than REBOOT_SLEEP_SECS.
373    # Sleep REBOOT_SLEEP_SECS and then begin probing the btpeer board.
374    time.sleep(REBOOT_SLEEP_SECS)
375    return _check_device_init(device, 'reboot')
376
377def _reset_device_power(device):
378    """Power cycle the device."""
379    RESET_SLEEP_SECS = 1
380    try:
381        if not device.PowerCycle():
382            logging.info('device.PowerCycle() failed')
383            return False
384    except:
385        logging.error('exception in device.PowerCycle')
386    else:
387        logging.info('device powercycled')
388    time.sleep(RESET_SLEEP_SECS)
389    return _check_device_init(device, 'reset')
390
391def _is_successful(result, legal_falsy_values=[]):
392    """Is the method result considered successful?
393
394    Some method results, for example that of class_of_service, may be 0 which is
395    considered a valid result. Occassionally, None is acceptable.
396
397    The default values exist for uses of this function before the options were
398    added, ideally we should change zero_ok to False.
399
400    @param result: a method result
401    @param legal_falsy_values: Values that are falsy but might be OK.
402
403    @returns: True if bool(result) is True, or if result is 0 and zero_ok, or if
404              result is None and none_ok.
405    """
406    truthiness_of_result = bool(result)
407    return truthiness_of_result or result in legal_falsy_values
408
409
410def _flag_common_failures(instance):
411    """Checks if a common failure has occurred during the test run
412
413    Scans system logs for known signs of failure. If a failure is discovered,
414    it is added to the test results, to make it easier to identify common root
415    causes from Stainless
416    """
417    had_failure = False
418
419    for fail_tag, fail_log in COMMON_FAILURES.items():
420        if instance.bluetooth_facade.messages_find(fail_tag):
421            had_failure = True
422            logging.error('Detected failure tag: %s', fail_tag)
423            # We mark this instance's results with the discovered failure
424            if type(instance.results) is dict:
425                instance.results[fail_log] = True
426
427    return had_failure
428
429
430def fix_serial_device(btpeer, device, operation='reset'):
431    """Fix the serial device.
432
433    This function tries to fix the serial device by
434    (1) re-creating a serial device, or
435    (2) power cycling the usb port to which device is connected
436    (3) rebooting the Bluetooth peeer
437
438    Argument operation determine which of the steps above are perform
439
440    Note that rebooting the btpeer board or resetting the device will remove
441    the state on the peripheral which might cause test failures. Please use
442    reset/reboot only before or after a test.
443
444    @param btpeer: the Bluetooth peer
445    @param device: the bluetooth device.
446    @param operation: Recovery operation to perform 'recreate/reset/reboot'
447
448    @returns: True if the serial device is fixed. False otherwise.
449
450    """
451
452    if operation == 'recreate':
453        # Check the serial connection. Fix it if needed.
454        if device.CheckSerialConnection():
455            # The USB serial connection still exists.
456            # Re-connection suffices to solve the problem. The problem
457            # is usually caused by serial port change. For example,
458            # the serial port changed from /dev/ttyUSB0 to /dev/ttyUSB1.
459            logging.info('retry: creating a new serial device...')
460            return recreate_serial_device(device)
461        else:
462            # Recreate the bluetooth peer device
463            return _check_device_init(device, operation)
464
465    elif operation == 'reset':
466        # Powercycle the USB port where the bluetooth peer device is connected.
467        # RN-42 and RN-52 share the same vid:pid so both will be powercycled.
468        # This will only work on fizz host with write protection removed.
469        # Note that the state on the device will be lost.
470        return _reset_device_power(device)
471
472    elif operation == 'reboot':
473        # Reboot the Bluetooth peer device.
474        # The device is power cycled before rebooting Bluetooth peer device
475        return _reboot_btpeer(btpeer, device)
476
477    else:
478        logging.error('fix_serial_device Invalid operation %s', operation)
479        return False
480
481
482def retry(test_method, instance, *args, **kwargs):
483    """Execute the target facade test_method(). Retry if failing the first time.
484
485    A test_method is something like self.test_xxxx() in BluetoothAdapterTests,
486    e.g., BluetoothAdapterTests.test_bluetoothd_running().
487
488    @param test_method: the test method to retry
489
490    @returns: True if the return value of test_method() is successful.
491              False otherwise.
492
493    """
494    if _is_successful(_run_method(test_method, test_method.__name__,
495                                  instance, *args, **kwargs)):
496        return True
497
498    # Try to fix the serial device if applicable.
499    logging.error('%s failed at the 1st time: (%s)', test_method.__name__,
500                  str(instance.results))
501
502    # If this test does not use any attached serial device, just re-run
503    # the test.
504    logging.info('%s: retry the 2nd time.', test_method.__name__)
505    time.sleep(1)
506
507
508    if not hasattr(instance, 'use_btpeer'):
509        return _is_successful(_run_method(test_method, test_method.__name__,
510                                          instance, *args, **kwargs))
511    for device_type in SUPPORTED_DEVICE_TYPES:
512        for device in getattr(instance, 'devices')[device_type]:
513            #fix_serial_device in 'recreate' mode doesn't require btpeer
514            #so just pass None for convenient.
515            if not fix_serial_device(None, device, "recreate"):
516                return False
517
518    logging.info('%s: retry the 2nd time.', test_method.__name__)
519    return _is_successful(_run_method(test_method, test_method.__name__,
520                                      instance, *args, **kwargs))
521
522
523def test_retry_and_log(test_method_or_retry_flag,
524                       messages_start=True,
525                       messages_stop=True):
526    """A decorator that logs test results, collects error messages, and retries
527       on request.
528
529    @param test_method_or_retry_flag: either the test_method or a retry_flag.
530        There are some possibilities of this argument:
531        1. the test_method to conduct and retry: should retry the test_method.
532            This occurs with
533            @test_retry_and_log
534        2. the retry flag is True. Should retry the test_method.
535            This occurs with
536            @test_retry_and_log(True)
537        3. the retry flag is False. Do not retry the test_method.
538            This occurs with
539            @test_retry_and_log(False)
540
541    @param messages_start: Start collecting messages before running the test
542    @param messages_stop: Stop collecting messages after running the test and
543                          analyze the results.
544
545    @returns: a wrapper of the test_method with test log. The retry mechanism
546        would depend on the retry flag.
547
548    """
549
550    def decorator(test_method):
551        """A decorator wrapper of the decorated test_method.
552
553        @param test_method: the test method being decorated.
554
555        @returns the wrapper of the test method.
556
557        """
558        @functools.wraps(test_method)
559        def wrapper(instance, *args, **kwargs):
560            """A wrapper of the decorated method.
561
562            @param instance: an BluetoothAdapterTests instance
563
564            @returns the result of the test method
565
566            """
567            instance.results = None
568            fail_msg = None
569            test_result = False
570            should_raise = hasattr(instance, 'fail_fast') and instance.fail_fast
571
572            instance.last_test_method = test_method.__name__
573            syslog_captured = False
574
575            try:
576                logging.info('[>>> running: {}]'.format(test_method.__name__))
577                start_time = time.time()
578                if messages_start:
579                    # Grab /var/log/messages output during test run
580                    instance.bluetooth_facade.messages_start()
581
582                if callable(test_method_or_retry_flag
583                            ) or test_method_or_retry_flag:
584                    test_result = retry(test_method, instance, *args, **kwargs)
585                else:
586                    test_result = test_method(instance, *args, **kwargs)
587
588                if messages_stop:
589                    syslog_captured = instance.bluetooth_facade.messages_stop()
590
591                if syslog_captured:
592                    had_failure = _flag_common_failures(instance)
593                    instance.had_known_common_failure = any(
594                            [instance.had_known_common_failure, had_failure])
595
596                logging.debug('instance._expected_result : %s',
597                              instance._expected_result)
598                elapsed_time = 'elapsed_time: {:.3f}s'.format(time.time() -
599                                                              start_time)
600                if instance._expected_result:
601                    if test_result:
602                        logging.info('[*** passed: {}] {}'.format(
603                                test_method.__name__, elapsed_time))
604                    else:
605                        fail_msg = '[--- failed: {} ({})]'.format(
606                                test_method.__name__, str(instance.results))
607                        logging.error('{} {}'.format(fail_msg, elapsed_time))
608                        instance.fails.append(fail_msg)
609                else:
610                    if test_result:
611                        # The test is expected to fail; but it passed.
612                        reason = 'expected fail, actually passed'
613                        fail_msg = '[--- failed: {} ({})]'.format(
614                                test_method.__name__, reason)
615                        logging.error('{} {}'.format(fail_msg, elapsed_time))
616                        instance.fails.append(fail_msg)
617                    else:
618                        # The test is expected to fail; and it did fail.
619                        reason = 'expected fail, actually failed'
620                        logging.info('[*** passed: {} ({})] {}'.format(
621                                test_method.__name__, reason, elapsed_time))
622
623            # Reset _expected_result and let the quicktest wrapper catch it.
624            # These errors should skip out of the testcase entirely.
625            except (error.TestNAError, error.TestError, error.TestFail):
626                instance._expected_result = True
627                raise
628
629            # Next test_method should pass by default.
630            instance._expected_result = True
631
632            # Check whether we should fail fast
633            if fail_msg and should_raise:
634                logging.info('Fail fast')
635                raise error.TestFail(instance.fails)
636
637            return test_result
638
639        return wrapper
640
641    if callable(test_method_or_retry_flag):
642        # If the decorator function comes with no argument like
643        # @test_retry_and_log
644        return decorator(test_method_or_retry_flag)
645    else:
646        # If the decorator function comes with an argument like
647        # @test_retry_and_log(False)
648        return decorator
649
650
651def test_case_log(method):
652    """A decorator for test case methods.
653
654    The main purpose of this decorator is to display the test case name
655    in the test log which looks like
656
657        <... test_case_RA3_CD_SI200_CD_PC_CD_UA3 ...>
658
659    @param method: the test case method to decorate.
660
661    @returns: a wrapper function of the decorated method.
662
663    """
664    @functools.wraps(method)
665    def wrapper(instance, *args, **kwargs):
666        """Log the name of the wrapped method before execution"""
667        logging.info('\n<... %s ...>', method.__name__)
668        method(instance, *args, **kwargs)
669    return wrapper
670
671
672class BluetoothAdapterTests(test.test):
673    """Server side bluetooth adapter tests.
674
675    This test class tries to thoroughly verify most of the important work
676    states of a bluetooth adapter.
677
678    The various test methods are supposed to be invoked by actual autotest
679    tests such as server/cros/site_tests/bluetooth_Adapter*.
680
681    """
682    version = 1
683    ADAPTER_ACTION_SLEEP_SECS = 1
684    ADAPTER_PAIRING_TIMEOUT_SECS = 60
685    ADAPTER_CONNECTION_TIMEOUT_SECS = 30
686    # Wait after connect for input device to be ready for use
687    ADAPTER_HID_INPUT_DELAY = 5
688    ADAPTER_DISCONNECTION_TIMEOUT_SECS = 30
689    ADAPTER_PAIRING_POLLING_SLEEP_SECS = 3
690    ADAPTER_DISCOVER_TIMEOUT_SECS = 60          # 30 seconds too short sometimes
691    ADAPTER_DISCOVER_POLLING_SLEEP_SECS = 1
692    ADAPTER_DISCOVER_NAME_TIMEOUT_SECS = 30
693    ADAPTER_WAKE_ENABLE_TIMEOUT_SECS = 30
694
695    ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS = 10
696    ADAPTER_POLLING_DEFAULT_SLEEP_SECS = 1
697
698    HID_REPORT_SLEEP_SECS = 1
699
700
701    DEFAULT_START_DELAY_SECS = 0
702    DEFAULT_HOLD_INTERVAL_SECS = 10
703    DEFAULT_HOLD_TIMEOUT_SECS = 60
704    DEFAULT_HOLD_SLEEP_SECS = 1
705
706    # Default suspend time in seconds for suspend resume.
707    SUSPEND_TIME_SECS=10
708    SUSPEND_ENTER_SECS=10
709    RESUME_TIME_SECS=30
710    RESUME_INTERNAL_TIMEOUT_SECS = 180
711
712    # Minimum RSSI required for peer devices during testing
713    MIN_RSSI = -70
714
715    # hci0 is the default hci device if there is no external bluetooth dongle.
716    EXPECTED_HCI = 'hci0'
717
718    CLASS_OF_SERVICE_MASK = 0xFFE000
719    CLASS_OF_DEVICE_MASK = 0x001FFF
720
721    # Constants about advertising.
722    DEFAULT_MIN_ADVERTISEMENT_INTERVAL_MS = 200
723    DEFAULT_MAX_ADVERTISEMENT_INTERVAL_MS = 200
724    ADVERTISING_INTERVAL_UNIT = 0.625
725
726    # Error messages about advertising dbus methods.
727    ERROR_FAILED_TO_REGISTER_ADVERTISEMENT = (
728            'org.bluez.Error.NotPermitted: Maximum advertisements reached')
729    ERROR_INVALID_ADVERTISING_INTERVALS = (
730            'org.bluez.Error.InvalidArguments: Invalid arguments')
731
732    # Supported profiles by ChromeOS.
733    SUPPORTED_UUIDS = {
734            'GATT_UUID': '00001801-0000-1000-8000-00805f9b34fb',
735            'A2DP_SOURCE_UUID': '0000110a-0000-1000-8000-00805f9b34fb',
736            'HFP_AG_UUID': '0000111f-0000-1000-8000-00805f9b34fb',
737            'PNP_UUID': '00001200-0000-1000-8000-00805f9b34fb',
738            'GAP_UUID': '00001800-0000-1000-8000-00805f9b34fb'}
739
740    # Board list for name/ID test check. These devices don't need to be tested
741    REFERENCE_BOARDS = [
742            'rambi', 'nyan', 'oak', 'reef', 'yorp', 'bip', 'volteer',
743            'volteer2'
744    ]
745
746    # Path for btmon logs
747    BTMON_DIR_LOG_PATH = '/var/log/btmon'
748
749    # Path for usbmon logs
750    USBMON_DIR_LOG_PATH = '/var/log/usbmon'
751
752    # Parameters for usbmon log rotation
753    USBMON_SINGLE_FILE_MAX_SIZE = '10M'  # 10M bytes
754    USBMON_NUM_OF_ROTATE_FILE = 2
755
756    # The agent capability of various device types.
757    # Currently all non-Fast Pair are set to NoInputNoOutput since currently
758    # we don't have a way to report the displayed passkey to the device in case
759    # of Passkey Entry. Therefore, use 'Just Works'.
760    # Fast Pair uses DisplayYesNo because this is expected by that protocol.
761    # TODO(b/181945748) update the capabilities when Passkey Entry is supported.
762    AGENT_CAPABILITY = {
763            'BLE_MOUSE': 'NoInputNoOutput',
764            'BLE_KEYBOARD': 'NoInputNoOutput',
765            'BLE_PHONE': 'NoInputNoOutput',
766            'BLUETOOTH_AUDIO': 'NoInputNoOutput',
767            'FAST_PAIR': 'DisplayYesNo',
768            'KEYBOARD': 'NoInputNoOutput',
769            'MOUSE': 'NoInputNoOutput',
770    }
771
772    def assert_on_fail(self, result, raiseNA=False):
773        """ If the called function returns a false-like value, raise an error.
774
775        Call test methods (i.e. with @test_retry_and_log) wrapped with this
776        function and failures will raise instead of continuing the test.
777
778        For example:
779            self.assert_on_fail(self.test_pairing(...))
780
781        @param result: Result of test method called.
782        @param raiseNA: Whether to raise TestNAError instead of TestFail
783
784        @raises error.TestNAError
785        @raises error.TestFail
786        """
787        if not result:
788            failure_msg = 'Assert on fail: {}'.format(self.last_test_method)
789            logging.error(failure_msg)
790            if raiseNA:
791                raise error.TestNAError(failure_msg)
792            else:
793                raise error.TestFail(failure_msg)
794
795
796    def expect_fail(self, test_method, *args, **kwargs):
797        """Run the test_method which is expected to fail.
798
799        Here a test means one that comes with the @test_retry_and_log
800        decorator.
801
802        In most cases, a test is expected to pass by default. However, in
803        some cases, we may expect a test to fail. As an example, a test is
804        expected to fail if the behavior is disallowed by the policy. In
805        this case, the failure is considered a pass. The example statements
806        look like
807
808            # Set an arbitrary UUID that disallows the HID device.
809            self.test_check_set_allowlist('0xabcd', True)
810
811            # Since the HID device is disallowed above,
812            # the self.test_keyboard_input_from_trace test itself would
813            # fail which is expected.
814            self.expect_fail(self.test_keyboard_input_from_trace,
815                             device, "simple_text")
816
817        In the log, the message would show that the keyboard input failed as
818
819            test_keyboard_input_from_trace: InputEventRecorderError: Failed to
820            find the device node of KEYBD_REF.
821
822        As a result, the log message shows that the test conceptually passed.
823
824            [*** passed: test_keyboard_input_from_trace (expected fail,
825                                                         actually failed)]
826
827        @param test_method: the test method to run.
828
829        @returns: True if the test method failed; False otherwise.
830        """
831        self._expected_result = False
832        logging.debug('self._expected_result %s', self._expected_result)
833        return test_method(*args, **kwargs)
834
835
836    def expect_test(self, expected_result, test_method, *args, **kwargs):
837        """Run the test method and expect the test result as expected_result.
838
839        This little helper is used to make simple the following statements
840
841            if expected_result:
842                self.test_xxx(device)
843            else:
844                self.expect_fail(self.test_xxx, device)
845
846        which can be converted to
847
848            self.expect_test(expected_result, self.test_xxx, device)
849
850        @param expected_result: True if the test is expected to pass;
851                False otherwise.
852        @param test_method: the test method to run.
853
854        @returns: True if the test result matches expected_result;
855                False otherwise.
856        """
857        if expected_result:
858            # If the test is expected to pass, just run it normally.
859            return test_method(*args, **kwargs)
860        else:
861            # If the test is expected to fail, run it throuigh self.expect_fail
862            # to handle the failure.
863            return self.expect_fail(test_method, *args, **kwargs)
864
865
866    # TODO(b/131170539) remove when sarien/arcada no longer have _signed
867    # postfix
868    def get_base_platform_name(self):
869        """Returns the DUT platform name
870
871        If the DUT is a DVT device, _signed or _unsigned may be appended
872            to the device name, which we should ignore in our BT tests
873
874        @returns: String name of the DUT's platform with _signed or
875                _unsigned removed
876        """
877
878        platform = self.host.get_platform()
879
880        return platform.replace('_signed', '').replace('_unsigned', '')
881
882    def platform_will_reconnect_on_boot(self):
883        """Indicates if we should expect DUT to automatically reconnect on boot
884
885        Some platforms do not have built-in I/O (i.e. ChromeBox) and will
886        automatically reconnect to paired HID devices on boot.
887
888        @returns: True if platform will reconnect on boot, else False
889        """
890
891        return self.host.get_board_type() in RECONNECT_PLATFORM_TYPES
892
893    def group_btpeers_type(self):
894        """Group all Bluetooth peers by the type of their detected device."""
895
896        # Use previously created btpeer_group instead of creating new
897        if len(self.btpeer_group_copy) > 0:
898            logging.info('Using previously created btpeer group')
899            for device_type in SUPPORTED_DEVICE_TYPES:
900                self.btpeer_group[device_type] = \
901                    self.btpeer_group_copy[device_type][:]
902            return
903
904        # Create new btpeer_group
905        for device_type in SUPPORTED_DEVICE_TYPES:
906            self.btpeer_group[device_type] = list()
907            # Create copy of btpeer_group
908            self.btpeer_group_copy[device_type] = list()
909
910        for idx, btpeer in enumerate(self.host.btpeer_list):
911            for device_type,gen_device_func in SUPPORTED_DEVICE_TYPES.items():
912                try:
913                    device = gen_device_func(btpeer)()
914                    if device.CheckSerialConnection():
915                        self.btpeer_group[device_type].append(btpeer)
916                        logging.debug('%d-th btpeer find device %s', \
917                                     idx, device_type)
918                        # Create copy of btpeer_group
919                        self.btpeer_group_copy[device_type].append(btpeer)
920                except:
921                    logging.debug('Error with initializing %s on %d-th'
922                                  'btpeer', device_type, idx)
923            if len(self.btpeer_group[device_type]) == 0:
924                logging.error('No device is detected on %d-th btpeer', idx)
925
926        logging.debug("self.bt_group is %s",self.btpeer_group)
927
928
929    def wait_for_device(self, device, timeout=10):
930        """Waits for device to become available again
931
932        We reset raspberry pi peer between tests. This method helps us wait to
933        prevent us from trying to use the device before it comes back up again.
934
935        @param device: proxy object of peripheral device
936        """
937
938        def is_device_ready():
939            """Tries to use a service of the device
940
941            @returns: True if device is available to provide service
942                      False otherwise
943            """
944
945            try:
946                # Call a simple (fast) function to determine if device is online
947                # and reachable. If we can query this property, we know the
948                # device is available for us to use
949                getattr(device, 'GetCapabilities')()
950
951            except Exception as e:
952                return False
953
954            return True
955
956
957        try:
958            utils.poll_for_condition(condition=is_device_ready,
959                                     desc='wait_for_device',
960                                     timeout=timeout)
961
962        except utils.TimeoutError as e:
963            raise error.TestError('Peer is not available after waiting')
964
965
966    def clear_raspi_device(self, device, next_device_type=None):
967        """Clears a device on a raspi peer by resetting bluetooth stack
968
969        @param device: proxy object of peripheral device
970        """
971
972        try:
973            device.ResetStack(next_device_type)
974
975        except socket.error as e:
976            # Ignore conn reset, expected during stack reset
977            if e.errno != errno.ECONNRESET:
978                raise
979
980        except chameleon.ChameleonConnectionError as e:
981            # Ignore chameleon conn reset, expected during stack reset
982            if (str(errno.ECONNRESET) not in str(e) and
983                    'ResetStack' not in str(e)):
984                raise
985            logging.info('Ignored exception due to ResetStack: %s', str(e))
986
987        except six.moves.http_client.BadStatusLine as e:
988            # BadStatusLine occurs occasionally when chameleon
989            # is restarted. We ignore it here
990            logging.error('Ignoring badstatusline exception')
991            pass
992
993        # Catch generic Fault exception by rpc server, ignore
994        # method not available as it indicates platform didn't
995        # support method and that's ok
996        except Exception as e:
997            if not (e.__class__.__name__ == 'Fault' and
998                'is not supported' in str(e)):
999                raise
1000
1001        # Ensure device is back online before continuing
1002        self.wait_for_device(device, timeout=30)
1003
1004    def device_set_powered(self, device, powered):
1005        """Set raspi BT powered state.
1006
1007        @param powered: Powered state to set on Raspi.
1008        """
1009        if powered:
1010            device.AdapterPowerOn()
1011        else:
1012            device.AdapterPowerOff()
1013
1014    def get_device_rasp(self, device_num, on_start=True):
1015        """Get all bluetooth device objects from Bluetooth peer devices
1016        This method should be called only after group_btpeers_type
1017        @param device_num : dict of {device_type:number}, to specify the number
1018                            of device needed for each device_type.
1019
1020        @param on_start: boolean describing whether the requested clear is for a
1021                            new test, or in the middle of a current one
1022
1023        @returns: True if Success.
1024        """
1025
1026        logging.info("in get_device_rasp %s onstart %s", device_num, on_start)
1027        total_num_devices = sum(device_num.values())
1028        if total_num_devices > len(self.host.btpeer_list):
1029            logging.error(
1030                    'Total number of devices %s is greater than the'
1031                    ' number of Bluetooth peers %s', total_num_devices,
1032                    len(self.host.btpeer_list))
1033            return False
1034
1035        for device_type, number in device_num.items():
1036            total_num_devices += number
1037            if len(self.btpeer_group[device_type]) < number:
1038                logging.error('Number of Bluetooth peers with device type'
1039                      '%s is %d, which is less then needed %d', device_type,
1040                      len(self.btpeer_group[device_type]), number)
1041                return False
1042
1043            for btpeer in self.btpeer_group[device_type][:number]:
1044                logging.info("getting emulated %s", device_type)
1045                device = self.reset_btpeer(btpeer, device_type, on_start)
1046
1047                self.devices[device_type].append(device)
1048
1049                # Remove this btpeer from btpeer_group since it is already
1050                # configured as a specific device
1051                for temp_device in SUPPORTED_DEVICE_TYPES:
1052                    if btpeer in self.btpeer_group[temp_device]:
1053                        self.btpeer_group[temp_device].remove(btpeer)
1054
1055        return True
1056
1057    def get_peer_device_type(self, device):
1058        """Determine the type of peer a device is emulating
1059
1060        Sometimes it is useful to be able to flexibly determine what type of
1061        peripheral a device is emulating. This helper function does a reverse
1062        look-up to determine what type it was registered as.
1063
1064        @param device: the emulated peer device
1065
1066        @returns: the emulated device type if found, e.g. 'MOUSE' or
1067            'BLE_KEYBOARD', else None
1068        """
1069
1070        for device_type, device_list in self.devices.items():
1071            if device in device_list:
1072                return device_type
1073
1074        return None
1075
1076    def get_device(self, device_type, on_start=True):
1077        """Get the bluetooth device object.
1078
1079        @param device_type : the bluetooth device type, e.g., 'MOUSE'
1080
1081        @param on_start: boolean describing whether the requested clear is for a
1082                            new test, or in the middle of a current one
1083
1084        @returns: the bluetooth device object
1085
1086        """
1087
1088        self.devices[device_type].append(
1089                self.reset_btpeer(self.host.btpeer, device_type, on_start))
1090
1091        return self.devices[device_type][-1]
1092
1093
1094    def reset_emulated_device(self, device, device_type, clear_device=True):
1095        """Reset the emulated device in order to be used as a different type.
1096
1097        @param device: the emulated peer device to reset with new device type
1098        @param device_type : the new bluetooth device type, e.g., 'MOUSE'
1099        @param clear_device: whether to clear the device state
1100
1101        @returns: the bluetooth device object
1102
1103        """
1104        # Re-fresh device to clean state if test is starting
1105        if clear_device:
1106            self.clear_raspi_device(device, next_device_type=device_type)
1107
1108        try:
1109            # Tell generic chameleon to bind to this device type
1110            device.SpecifyDeviceType(device_type)
1111
1112        # Catch generic Fault exception by rpc server, ignore method not
1113        # available as it indicates platform didn't support method and that's
1114        # ok
1115        except Exception as e:
1116            if not (e.__class__.__name__ == 'Fault' and
1117                    'is not supported' in str(e)):
1118                logging.error("got exception %s", str(e))
1119                raise
1120
1121        return device
1122
1123    def reset_btpeer(self, peer, device_type, clear_device=True):
1124        """Reset the btpeer device in order to be used as a different type.
1125
1126        @param peer: the btpeer device to reset with new device type
1127        @param device_type : the new bluetooth device type, e.g., 'MOUSE'
1128        @param clear_device: whether to clear the device state
1129
1130        @returns: the bluetooth device object
1131
1132        """
1133        device = get_bluetooth_emulated_device(peer, device_type)
1134
1135        return self.reset_emulated_device(device, device_type, clear_device)
1136
1137    def is_device_available(self, btpeer, device_type):
1138        """Determines if the named device is available on the linked peer
1139
1140        @param device_type: the bluetooth HID device type, e.g., 'MOUSE'
1141
1142        @returns: True if it is able to resolve the device, false otherwise
1143        """
1144
1145        device = SUPPORTED_DEVICE_TYPES[device_type](btpeer)()
1146        try:
1147            # The proxy prevents us from checking if the object is None directly
1148            # so instead we call a fast method that any peripheral must support.
1149            # This will fail if the object over the proxy doesn't exist
1150            getattr(device, 'GetCapabilities')()
1151
1152        except Exception as e:
1153            return False
1154
1155        return True
1156
1157
1158    def list_devices_available(self):
1159        """Queries which devices are available on btpeer(s)
1160
1161        @returns: dict mapping HID device types to number of supporting peers
1162                  available, e.g. {'MOUSE':1, 'KEYBOARD':1}
1163        """
1164        devices_available = {}
1165        for device_type in SUPPORTED_DEVICE_TYPES:
1166            for btpeer in self.host.btpeer_list:
1167                if self.is_device_available(btpeer, device_type):
1168                    devices_available[device_type] = \
1169                        devices_available.get(device_type, 0) + 1
1170
1171        logging.debug("devices available are %s", devices_available)
1172        return devices_available
1173
1174
1175    def suspend_resume(self, suspend_time=SUSPEND_TIME_SECS):
1176        """Suspend the DUT for a while and then resume.
1177
1178        @param suspend_time: the suspend time in secs
1179        @raises errors.TestFail if the device reboots during suspend
1180
1181        """
1182        boot_id = self.host.get_boot_id()
1183        suspend = self.suspend_async(suspend_time=suspend_time)
1184        start_time = self.bluetooth_facade.get_device_utc_time()
1185
1186        # Give the system some time to enter suspend
1187        self.test_suspend_and_wait_for_sleep(
1188                suspend, sleep_timeout=self.SUSPEND_ENTER_SECS)
1189
1190        # Wait for resume - since we're not testing suspend itself, we are
1191        # lenient with the resume time here
1192        self.test_wait_for_resume(boot_id,
1193                                  suspend,
1194                                  resume_timeout=suspend_time,
1195                                  test_start_time=start_time)
1196
1197
1198    def reboot(self):
1199        """Reboot the DUT and recreate necessary processes and variables"""
1200        self.host.reboot()
1201
1202        # We need to recreate the bluetooth_facade after a reboot.
1203        # Delete the proxy first so it won't delete the old one, which
1204        # invokes disconnection, after creating the new one.
1205        if hasattr(self, 'factory'):
1206            del self.factory
1207        if hasattr(self, 'bluetooth_facade'):
1208            del self.bluetooth_facade
1209        if hasattr(self, 'input_facade'):
1210            del self.input_facade
1211        self.factory = remote_facade_factory.RemoteFacadeFactory(
1212                self.host,
1213                disable_arc=True,
1214                no_chrome=not self.start_browser,
1215                force_python3=True)
1216        self.bluetooth_facade = self.factory.create_bluetooth_facade(
1217                self.floss)
1218        self.input_facade = self.factory.create_input_facade()
1219
1220        # Re-enable debugging verbose since Chrome will set it to
1221        # default(disable).
1222        self.enable_disable_debug_log(enable=True)
1223
1224        # Re-disable cellular
1225        self.enable_disable_cellular(enable=False)
1226
1227        # Re-disable ui
1228        self.enable_disable_ui(enable=False)
1229
1230        self.start_new_btmon()
1231        self.start_new_usbmon(reboot=True)
1232
1233
1234    def _wait_till_condition_holds(self, func, method_name,
1235                                   timeout=DEFAULT_HOLD_TIMEOUT_SECS,
1236                                   sleep_interval=DEFAULT_HOLD_SLEEP_SECS,
1237                                   hold_interval=DEFAULT_HOLD_INTERVAL_SECS,
1238                                   start_delay=DEFAULT_START_DELAY_SECS):
1239        """ Wait for the func() to hold true for a period of time
1240
1241
1242        @param func: the function to wait for.
1243        @param method_name: the invoking class method.
1244        @param timeout: number of seconds to wait before giving up.
1245        @param sleep_interval: the interval in seconds to sleep between
1246                invoking func().
1247        @param hold_interval: the interval in seconds for the condition to
1248                             remain true
1249        @param start_delay: interval in seconds to wait before starting
1250
1251        @returns: True if the condition is met,
1252                  False otherwise
1253
1254        """
1255        if start_delay > 0:
1256            logging.debug('waiting for %s secs before checking %s',start_delay,
1257                          method_name)
1258            time.sleep(start_delay)
1259
1260        try:
1261            utils.poll_till_condition_holds(condition=func,
1262                                            timeout=timeout,
1263                                            sleep_interval=sleep_interval,
1264                                            hold_interval = hold_interval,
1265                                            desc=('Waiting %s' % method_name))
1266            return True
1267        except utils.TimeoutError as e:
1268            logging.error('%s: %s', method_name, e)
1269        except Exception as e:
1270            logging.error('%s: %s', method_name, e)
1271            err = 'bluetoothd possibly crashed. Check out /var/log/messages.'
1272            logging.error(err)
1273        except:
1274            logging.error('%s: unexpected error', method_name)
1275        return False
1276
1277
1278    def _wait_for_condition(self, func, method_name,
1279                            timeout=ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS,
1280                            sleep_interval=ADAPTER_POLLING_DEFAULT_SLEEP_SECS,
1281                            start_delay=DEFAULT_START_DELAY_SECS):
1282        """Wait for the func() to become True.
1283
1284        @param func: the function to wait for.
1285        @param method_name: the invoking class method.
1286        @param timeout: number of seconds to wait before giving up.
1287        @param sleep_interval: the interval in seconds to sleep between
1288                invoking func().
1289        @param start_delay: interval in seconds to wait before starting
1290
1291        @returns: True if the condition is met,
1292                  False otherwise
1293
1294        """
1295
1296        if start_delay > 0:
1297            logging.debug('waiting for %s secs before checking %s',start_delay,
1298                          method_name)
1299            time.sleep(start_delay)
1300
1301        try:
1302            utils.poll_for_condition(condition=func,
1303                                     timeout=timeout,
1304                                     sleep_interval=sleep_interval,
1305                                     desc=('Waiting %s' % method_name))
1306            return True
1307        except utils.TimeoutError as e:
1308            logging.error('%s: %s', method_name, e)
1309        except Exception as e:
1310            logging.error('%s: %s', method_name, e)
1311            err = 'bluetoothd possibly crashed. Check out /var/log/messages.'
1312            logging.error(err)
1313        except:
1314            logging.error('%s: unexpected error', method_name)
1315        return False
1316
1317    def ignore_failure(instance, test_method, *args, **kwargs):
1318        """ Wrapper to prevent a test_method failure from failing the test batch
1319
1320        Sometimes a test method needs to be used as a normal function, for its
1321        result. This wrapper prevent test_method failure being recorded in
1322        instance.fails and causing a failure of the quick test batch.
1323
1324        @param test_method: test_method
1325        @returns: result of the test_method
1326        """
1327
1328        original_fails = instance.fails[:]
1329        test_result = test_method(*args, **kwargs)
1330        if not test_result:
1331            logging.info("%s failure is ignored",test_method.__name__)
1332            instance.fails = original_fails
1333        return test_result
1334
1335
1336    def start_agent(self, device):
1337        """Start the pairing agent of the device if applicable.
1338
1339        @param device: the peer device
1340        """
1341        dev_type = device.GetDeviceType()
1342        capability = self.AGENT_CAPABILITY.get(dev_type)
1343        if capability:
1344            device.StartPairingAgent(capability)
1345
1346
1347    def stop_agent(self, device):
1348        """Stop the pairing agent of the device if applicable.
1349
1350        @param device: the peer device
1351        """
1352        dev_type = device.GetDeviceType()
1353        capability = self.AGENT_CAPABILITY.get(dev_type)
1354        if capability:
1355            device.StopPairingAgent()
1356
1357
1358    # -------------------------------------------------------------------
1359    # Adater standalone tests
1360    # -------------------------------------------------------------------
1361
1362
1363    def service_exists(self, service_name):
1364        """Checks if a service exists on the DUT
1365
1366        @param service_name: name of the service
1367
1368        @returns: True if service status can be queried, else False
1369        """
1370
1371        status_cmd = 'initctl status {}'.format(service_name)
1372        try:
1373            # Querying the status of a non-existent service throws an
1374            # AutoservRunError exception.  If no exception is thrown, we know
1375            # the service exists
1376            self.host.run(status_cmd)
1377
1378        except error.AutoservRunError:
1379            return False
1380
1381        return True
1382
1383
1384    def service_enabled(self, service_name):
1385        """Checks if a service is running on the DUT
1386
1387        @param service_name: name of the service
1388
1389        @throws: AutoservRunError is thrown if there is no service with the
1390                provided name installed on the DUT.
1391
1392        @returns: True if service is currently running, else False
1393        """
1394
1395        status_cmd = 'initctl status {}'.format(service_name)
1396        output = self.host.run(status_cmd).stdout
1397
1398        return 'start/running' in output
1399
1400
1401    def _initctl_services(self, services, command):
1402        """Use initctl to control service on the DUT
1403
1404        @param services: list of string service names
1405        @param command: initctl command on the services
1406                        'start': to start the service
1407                        'stop': to stop the service
1408                        'restart': to restart the service
1409
1410        @returns: True if services were set successfully, else False
1411        """
1412        for service in services:
1413            # Some platforms will not support all services. In these cases,
1414            # no need to fail, since they won't interfere with our tests
1415            if not self.service_exists(service):
1416                logging.debug('Service %s does not exist on DUT', service)
1417                continue
1418
1419            # A sample call to enable or disable a service is as follows:
1420            # "initctl stop modemfwd"
1421            if command in ['start', 'stop']:
1422                enable = command == 'start'
1423                if self.service_enabled(service) != enable:
1424                    self.host.run('initctl {} {}'.format(command, service))
1425
1426                if self.service_enabled(service) != enable:
1427                    logging.error('Failed to set initctl service to state %d',
1428                                  enable)
1429                    return False
1430
1431                if enable:
1432                    logging.info('Service {} enabled'.format(service))
1433                else:
1434                    logging.info('Service {} disabled'.format(service))
1435
1436            elif command == 'restart':
1437                if self.service_enabled(service):
1438                    self.host.run('initctl {} {}'.format(command, service))
1439                else:
1440                    # Just start a stopped job.
1441                    self.host.run('initctl {} {}'.format('start', service))
1442                logging.info('Service {} restarted'.format(service))
1443
1444            else:
1445                logging.error('unknown command {} on services {}'.format(
1446                              command, services))
1447                return False
1448
1449        return True
1450
1451
1452    def enable_disable_services(self, services, enable):
1453        """Enable or disable service on the DUT
1454
1455        @param services: list of string service names
1456        @param enable: True to enable services, False to disable
1457
1458        @returns: True if services were set successfully, else False
1459        """
1460        command = 'start' if enable else 'stop'
1461        return self._initctl_services(services, command)
1462
1463
1464    def enable_disable_cellular(self, enable):
1465        """Enable cellular services on the DUT
1466
1467        @param enable: True to enable cellular services
1468                       False to disable cellular services
1469
1470        @returns: True if services were set successfully, else False
1471        """
1472        cellular_services = ['modemmanager', 'modemfwd']
1473
1474        return self.enable_disable_services(cellular_services, enable)
1475
1476
1477    def enable_disable_ui(self, enable):
1478        """Enable UI service on the DUT
1479
1480        @param enable: True to enable UI services
1481                       False to disable UI services
1482
1483        @returns: True if services were set successfully, else False
1484        """
1485        ui_services = ['ui']
1486
1487        return self.enable_disable_services(ui_services, enable)
1488
1489
1490    def restart_services(self, services):
1491        """Restart a service on the DUT
1492
1493        @param services: the services, e.g., ['cras',]
1494
1495        @returns: True if services were set successfully, else False
1496        """
1497        return self._initctl_services(services, 'restart')
1498
1499
1500    def restart_cras(self):
1501        """Restart the cras service on the DUT
1502
1503        @returns: True if cras was restart successfully, else False
1504        """
1505        return self.bluetooth_facade.restart_cras()
1506
1507
1508    def enable_disable_debug_log(self, enable):
1509        """Enable or disable debug log in DUT
1510        @param enable: True to enable all of the debug log,
1511                       False to disable all of the debug log.
1512        """
1513        level = int(enable)
1514        self.bluetooth_facade.set_debug_log_levels(level, level)
1515
1516    def enable_disable_quality_debug_log(self, enable):
1517        """Enable or disable bluez quality debug log in the DUT
1518        @param enable: True to enable all of the debug log,
1519                       False to disable all of the debug log.
1520        """
1521        self.bluetooth_facade.set_quality_debug_log(bool(enable))
1522
1523    def start_new_btmon(self):
1524        """ Start a new btmon process and save the log """
1525
1526        # Kill all btmon process before creating a new one
1527        self.host.run('pkill btmon || true')
1528
1529        # Make sure the directory exists
1530        self.host.run('mkdir -p %s' % self.BTMON_DIR_LOG_PATH)
1531
1532        # Time format. Ex, 2020_02_20_17_52_45
1533        now = time.strftime("%Y_%m_%d_%H_%M_%S")
1534        file_name = 'btsnoop_%s' % now
1535
1536        path = os.path.join(self.BTMON_DIR_LOG_PATH, file_name)
1537        self.host.run_background('btmon -SAw %s' % path)
1538        return path
1539
1540    def start_new_usbmon(self, reboot=False):
1541        """ Start a new USBMON process and save the log
1542
1543        @param reboot: True to indicate we are starting new usbmon on reboot
1544                       False otherwise
1545        """
1546
1547        # Kill all usbmon process before creating a new one
1548        self.host.run('pkill tcpdump || true')
1549
1550        # Delete usbmon logs from previous tests unless we are starting another
1551        # usbmon because of reboot.
1552        if not reboot:
1553            self.host.run('rm -f %s/*' % (self.USBMON_DIR_LOG_PATH))
1554
1555        # Make sure the directory exists
1556        self.host.run('mkdir -p %s' % self.USBMON_DIR_LOG_PATH)
1557
1558        # Time format. Ex, 2020_02_20_17_52_45
1559        now = time.strftime("%Y_%m_%d_%H_%M_%S")
1560        file_name = 'usbmon_%s' % now
1561        self.host.run_background('tcpdump -i usbmon0 -w %s/%s -C %s -W %d' %
1562                                 (self.USBMON_DIR_LOG_PATH, file_name,
1563                                  self.USBMON_SINGLE_FILE_MAX_SIZE,
1564                                  self.USBMON_NUM_OF_ROTATE_FILE))
1565
1566
1567    def log_message(self, msg):
1568        """ Write a string to log."""
1569        self.bluetooth_facade.log_message(msg)
1570
1571    def is_wrt_supported(self):
1572        """ Check if Bluetooth adapter support WRT logs. """
1573        return self.bluetooth_facade.is_wrt_supported()
1574
1575    def enable_wrt_logs(self):
1576        """ Enable WRT logs from Intel Adapters."""
1577        return self.bluetooth_facade.enable_wrt_logs()
1578
1579    def collect_wrt_logs(self):
1580        """ Collect WRT logs from Intel Adapters."""
1581        return self.bluetooth_facade.collect_wrt_logs()
1582
1583    @test_retry_and_log
1584    def test_bluetoothd_running(self):
1585        """Test that bluetoothd is running."""
1586        return self.bluetooth_facade.is_bluetoothd_running()
1587
1588
1589    @test_retry_and_log
1590    def test_start_bluetoothd(self):
1591        """Test that bluetoothd could be started successfully."""
1592        return self.bluetooth_facade.start_bluetoothd()
1593
1594
1595    @test_retry_and_log
1596    def test_stop_bluetoothd(self):
1597        """Test that bluetoothd could be stopped successfully."""
1598        return self.bluetooth_facade.stop_bluetoothd()
1599
1600
1601    @test_retry_and_log
1602    def test_has_adapter(self):
1603        """Verify that there is an adapter. This will return True only if both
1604        the kernel and bluetooth daemon see the adapter.
1605        """
1606        return self.bluetooth_facade.has_adapter()
1607
1608    @test_retry_and_log
1609    def test_adapter_work_state(self):
1610        """Test that the bluetooth adapter is in the correct working state.
1611
1612        This includes that the adapter is detectable, is powered on,
1613        and its hci device is hci0.
1614        """
1615        has_adapter = self.bluetooth_facade.has_adapter()
1616        is_powered_on = self._wait_for_condition(
1617                self.bluetooth_facade.is_powered_on, method_name())
1618        hci = self.bluetooth_facade.get_hci() == self.EXPECTED_HCI
1619        self.results = {
1620                'has_adapter': has_adapter,
1621                'is_powered_on': is_powered_on,
1622                'hci': hci}
1623        return all(self.results.values())
1624
1625    @test_retry_and_log(False)
1626    def test_adapter_wake_enabled(self):
1627        """Test that the bluetooth adapter is wakeup enabled.
1628        """
1629        wake_enabled = self._wait_for_condition(
1630                self.bluetooth_facade.is_wake_enabled, method_name(),
1631                timeout=self.ADAPTER_WAKE_ENABLE_TIMEOUT_SECS)
1632
1633        self.results = { 'wake_enabled': wake_enabled }
1634        return any(self.results.values())
1635
1636    @test_retry_and_log(False)
1637    def test_device_wake_allowed(self, device_address):
1638        """Test that given device can wake the system."""
1639        self.results = {
1640                'Wake allowed':
1641                self.bluetooth_facade.get_device_property(
1642                        device_address, 'WakeAllowed')
1643        }
1644
1645        return all(self.results.values())
1646
1647    @test_retry_and_log(False)
1648    def test_device_wake_not_allowed(self, device_address):
1649        """Test that given device cannot wake the system."""
1650        self.results = {
1651                'Wake not allowed':
1652                not self.bluetooth_facade.get_device_property(
1653                        device_address, 'WakeAllowed')
1654        }
1655
1656        return all(self.results.values())
1657
1658    @test_retry_and_log(False)
1659    def test_adapter_set_wake_disabled(self):
1660        """Disable wake and verify it was written. """
1661        success = self.bluetooth_facade.set_wake_enabled(False)
1662        self.results = { 'disable_wake': success }
1663        return all(self.results.values())
1664
1665    @test_retry_and_log
1666    def test_power_on_adapter(self):
1667        """Test that the adapter could be powered on successfully."""
1668        power_on = self.bluetooth_facade.set_powered(True)
1669        is_powered_on = self._wait_for_condition(
1670                self.bluetooth_facade.is_powered_on, method_name())
1671
1672        self.results = {'power_on': power_on, 'is_powered_on': is_powered_on}
1673        return all(self.results.values())
1674
1675
1676    @test_retry_and_log
1677    def test_power_off_adapter(self):
1678        """Test that the adapter could be powered off successfully."""
1679        power_off = self.bluetooth_facade.set_powered(False)
1680        is_powered_off = self._wait_for_condition(
1681                lambda: not self.bluetooth_facade.is_powered_on(),
1682                method_name())
1683
1684        self.results = {
1685                'power_off': power_off,
1686                'is_powered_off': is_powered_off}
1687        return all(self.results.values())
1688
1689
1690    @test_retry_and_log
1691    def test_reset_on_adapter(self):
1692        """Test that the adapter could be reset on successfully.
1693
1694        This includes restarting bluetoothd, and removing the settings
1695        and cached devices.
1696        """
1697        reset_on = self.bluetooth_facade.reset_on()
1698        is_powered_on = self._wait_for_condition(
1699                self.bluetooth_facade.is_powered_on, method_name())
1700
1701        self.results = {'reset_on': reset_on, 'is_powered_on': is_powered_on}
1702        return all(self.results.values())
1703
1704
1705    @test_retry_and_log
1706    def test_reset_off_adapter(self):
1707        """Test that the adapter could be reset off successfully.
1708
1709        This includes restarting bluetoothd, and removing the settings
1710        and cached devices.
1711        """
1712        reset_off = self.bluetooth_facade.reset_off()
1713        is_powered_off = self._wait_for_condition(
1714                lambda: not self.bluetooth_facade.is_powered_on(),
1715                method_name())
1716
1717        self.results = {
1718                'reset_off': reset_off,
1719                'is_powered_off': is_powered_off}
1720        return all(self.results.values())
1721
1722
1723    def test_is_powered_off(self):
1724        """Check if the adapter is powered off."""
1725        is_powered_off = not self.bluetooth_facade.is_powered_on()
1726        self.results = {'is_powered_off': is_powered_off}
1727        return all(self.results.values())
1728
1729
1730    @test_retry_and_log(False)
1731    def test_is_facade_valid(self):
1732        """Checks whether the bluetooth facade is in a good state.
1733
1734        If bluetoothd restarts (i.e. due to a crash), the object proxies will no
1735        longer be valid (because the session will be closed). Check whether the
1736        session failed and wait for a new session if it did.
1737        """
1738        initially_ok = self.bluetooth_facade.is_bluetoothd_valid()
1739        daemon_started = initially_ok or self.bluetooth_facade.start_bluetoothd(
1740        )
1741        eventually_ok = initially_ok or self.bluetooth_facade.is_bluetoothd_valid(
1742        )
1743
1744        self.results = {
1745                'initially_ok': initially_ok,
1746                'eventually_ok': eventually_ok,
1747                'daemon_started': daemon_started,
1748        }
1749        return all(
1750                [self.results[x] for x in ['eventually_ok', 'daemon_started']])
1751
1752
1753    @test_retry_and_log(False)
1754    def test_is_adapter_valid(self):
1755        """Verify the bluetooth adapter is retrievable at test start
1756
1757        @raises: error.TestNAError if we fail to retrieve the adapter on
1758                    an unsupported chipset
1759                 error.TestFail if we fail to retrieve the adapter on any other
1760                    platform
1761
1762        @returns: True if the adapter was located properly
1763        """
1764
1765        if not self.bluetooth_facade.has_adapter():
1766            logging.error('No adapter available, rebooting to recover')
1767
1768            self.reboot()
1769
1770            chipset = self.bluetooth_facade.get_chipset_name()
1771
1772            if not chipset:
1773                raise error.TestFail('Unknown adapter is missing')
1774
1775            # A missing adapter is a rare but known issue on several platforms
1776            # that have no vendor support (b/169328792). Since there is no fix
1777            # possible, we forgive these failures by raising a TestNA.
1778            if chipset in UNSUPPORTED_CHIPSETS:
1779                raise error.TestNAError('Unsupported adapter is missing')
1780
1781            raise error.TestFail('Adapter is missing')
1782
1783        return True
1784
1785    @test_retry_and_log
1786    def test_UUIDs(self):
1787        """Test that basic profiles are supported."""
1788        adapter_UUIDs = self.bluetooth_facade.get_UUIDs()
1789        self.results = [uuid for uuid in self.SUPPORTED_UUIDS.values()
1790                        if uuid not in adapter_UUIDs]
1791        return not bool(self.results)
1792
1793
1794    @test_retry_and_log
1795    def test_start_discovery(self):
1796        """Test that the adapter could start discovery."""
1797        start_discovery, _ = self.bluetooth_facade.start_discovery()
1798        is_discovering = self._wait_for_condition(
1799                self.bluetooth_facade.is_discovering, method_name())
1800
1801        self.results = {
1802                'start_discovery': start_discovery,
1803                'is_discovering': is_discovering}
1804        return all(self.results.values())
1805
1806    @test_retry_and_log(False)
1807    def test_is_discovering(self):
1808        """Test that the adapter is already discovering."""
1809        is_discovering = self._wait_for_condition(
1810                self.bluetooth_facade.is_discovering, method_name())
1811
1812        self.results = {'is_discovering': is_discovering}
1813        return all(self.results.values())
1814
1815    @test_retry_and_log
1816    def test_stop_discovery(self):
1817        """Test that the adapter could stop discovery."""
1818        if not self.bluetooth_facade.is_discovering():
1819            return True
1820
1821        stop_discovery, _ = self.bluetooth_facade.stop_discovery()
1822        is_not_discovering = self._wait_for_condition(
1823                lambda: not self.bluetooth_facade.is_discovering(),
1824                method_name())
1825
1826        self.results = {
1827                'stop_discovery': stop_discovery,
1828                'is_not_discovering': is_not_discovering}
1829        return all(self.results.values())
1830
1831
1832    @test_retry_and_log
1833    def test_discoverable(self):
1834        """Test that the adapter could be set discoverable."""
1835        set_discoverable = self.bluetooth_facade.set_discoverable(True)
1836        is_discoverable = self._wait_for_condition(
1837                self.bluetooth_facade.is_discoverable, method_name())
1838
1839        self.results = {
1840                'set_discoverable': set_discoverable,
1841                'is_discoverable': is_discoverable}
1842        return all(self.results.values())
1843
1844    @test_retry_and_log(False)
1845    def test_is_discoverable(self):
1846        """Test that the adapter is discoverable."""
1847        is_discoverable = self._wait_for_condition(
1848                self.bluetooth_facade.is_discoverable, method_name())
1849
1850        self.results = {'is_discoverable': is_discoverable}
1851        return all(self.results.values())
1852
1853
1854    def _test_timeout_property(self, set_property, check_property, set_timeout,
1855                              get_timeout, property_name,
1856                              timeout_values = [0, 60, 180]):
1857        """Common method to test (Discoverable/Pairable)Timeout .
1858
1859        This is used to test
1860        - DiscoverableTimeout property
1861        - PairableTimeout property
1862
1863        The test performs the following
1864           - Set PropertyTimeout
1865           - Read PropertyTimeout and make sure values match
1866           - Set adapter propety
1867           - In a loop check if property is active
1868           - Test fails property is false before timeout
1869           - Test fails property is True after timeout
1870           Repeat the test for different values for timeout
1871
1872           Note : Value of 0 mean it never timeouts, so the test will
1873                 end after 30 seconds.
1874        """
1875        def check_timeout(timeout):
1876            """Check for timeout value in loop while recording failures."""
1877            actual_timeout = get_timeout()
1878            if timeout != actual_timeout:
1879                logging.debug('%s timeout value read %s does not '
1880                              'match value set %s, yet', property_name,
1881                              actual_timeout, timeout)
1882                return False
1883            else:
1884                return True
1885
1886        def _test_timeout_property(timeout):
1887            # minium time after timeout before checking property
1888            MIN_DELTA_SECS = 3
1889            # Time between checking  property
1890            WAIT_TIME_SECS = 2
1891
1892            # Set and read back the timeout value
1893            if not set_timeout(timeout):
1894                logging.error('Setting the %s timeout failed',property_name)
1895                return False
1896
1897
1898            if not self._wait_for_condition(lambda : check_timeout(timeout),
1899                                            'check_'+property_name):
1900                logging.error('checking %s_timeout value timed out',
1901                              property_name)
1902                return False
1903
1904            #
1905            # Check that the timeout works
1906            # Check property is true until timeout
1907            # and then it is not
1908
1909            property_set = set_property(True)
1910            property_is_true = self._wait_for_condition(check_property,
1911                                                        method_name())
1912
1913            self.results = { 'set_%s' % property_name : property_set,
1914                             'is_%s' % property_name: property_is_true}
1915            logging.debug(self.results)
1916
1917            if not all(self.results.values()):
1918                logging.error('Setting %s failed',property_name)
1919                return False
1920
1921            start_time = time.time()
1922            while True:
1923                time.sleep(WAIT_TIME_SECS)
1924                cur_time = time.time()
1925                property_set = check_property()
1926                time_elapsed = cur_time - start_time
1927
1928                # Ignore check_property results made near the timeout
1929                # to avoid spurious failures.
1930                if abs(int(timeout - time_elapsed)) < MIN_DELTA_SECS:
1931                    continue
1932
1933                # Timeout of zero seconds mean that the adapter never times out
1934                # Check for 30 seconds and then exit the test.
1935                if timeout == 0:
1936                    if not property_set:
1937                        logging.error('Adapter is not %s after %.2f '
1938                                      'secs with a timeout of zero ',
1939                                      property_name, time_elapsed)
1940                        return False
1941                    elif time_elapsed > 30:
1942                        logging.debug('Adapter %s after %.2f seconds '
1943                                      'with timeout of zero as expected' ,
1944                                      property_name, time_elapsed)
1945                        return True
1946                    continue
1947
1948                #
1949                # Check if property is true till timeout ends and
1950                # false afterwards
1951                #
1952                if time_elapsed < timeout:
1953                    if not property_set:
1954                        logging.error('Adapter is not %s after %.2f '
1955                                      'secs before timeout of %.2f',
1956                                      property_name, time_elapsed, timeout)
1957                        return False
1958                else:
1959                    if property_set:
1960                        logging.error('Adapter is still %s after '
1961                                      ' %.2f secs with timeout of %.2f',
1962                                      property_name, time_elapsed, timeout)
1963                        return False
1964                    else:
1965                        logging.debug('Adapter not %s after %.2f '
1966                                      'secs with timeout of %.2f as expected ',
1967                                      property_name, time_elapsed, timeout)
1968                        return True
1969
1970        default_value = check_property()
1971        default_timeout = get_timeout()
1972
1973        result = []
1974        try:
1975            for timeout in timeout_values:
1976                result.append(_test_timeout_property(timeout))
1977            logging.debug("Test returning %s", all(result))
1978            return all(result)
1979        except:
1980            logging.error("exception in test_%s_timeout",property_name)
1981            raise
1982        finally:
1983            # Set the property back to default value permanently before
1984            # exiting the test
1985            set_timeout(0)
1986            set_property(default_value)
1987            # Set the timeout back to default value before exiting the test
1988            set_timeout(default_timeout)
1989
1990
1991    @test_retry_and_log
1992    def test_discoverable_timeout(self, timeout_values = [0, 60, 180]):
1993        """Test adapter dbus property DiscoverableTimeout."""
1994        return self._test_timeout_property(
1995            set_property = self.bluetooth_facade.set_discoverable,
1996            check_property = self.bluetooth_facade.is_discoverable,
1997            set_timeout = self.bluetooth_facade.set_discoverable_timeout,
1998            get_timeout = self.bluetooth_facade.get_discoverable_timeout,
1999            property_name = 'discoverable',
2000            timeout_values = timeout_values)
2001
2002    @test_retry_and_log
2003    def test_pairable_timeout(self, timeout_values = [0, 60, 180]):
2004        """Test adapter dbus property PairableTimeout."""
2005        return self._test_timeout_property(
2006            set_property = self.bluetooth_facade.set_pairable,
2007            check_property = self.bluetooth_facade.is_pairable,
2008            set_timeout = self.bluetooth_facade.set_pairable_timeout,
2009            get_timeout = self.bluetooth_facade.get_pairable_timeout,
2010            property_name = 'pairable',
2011            timeout_values = timeout_values)
2012
2013
2014    @test_retry_and_log
2015    def test_nondiscoverable(self):
2016        """Test that the adapter could be set non-discoverable."""
2017        set_nondiscoverable = self.bluetooth_facade.set_discoverable(False)
2018        is_nondiscoverable = self._wait_for_condition(
2019                lambda: not self.bluetooth_facade.is_discoverable(),
2020                method_name())
2021
2022        self.results = {
2023                'set_nondiscoverable': set_nondiscoverable,
2024                'is_nondiscoverable': is_nondiscoverable}
2025        return all(self.results.values())
2026
2027
2028    @test_retry_and_log
2029    def test_pairable(self):
2030        """Test that the adapter could be set pairable."""
2031        set_pairable = self.bluetooth_facade.set_pairable(True)
2032        is_pairable = self._wait_for_condition(
2033                self.bluetooth_facade.is_pairable, method_name())
2034
2035        self.results = {
2036                'set_pairable': set_pairable,
2037                'is_pairable': is_pairable}
2038        return all(self.results.values())
2039
2040
2041    @test_retry_and_log
2042    def test_nonpairable(self):
2043        """Test that the adapter could be set non-pairable."""
2044        set_nonpairable = self.bluetooth_facade.set_pairable(False)
2045        is_nonpairable = self._wait_for_condition(
2046                lambda: not self.bluetooth_facade.is_pairable(), method_name())
2047
2048        self.results = {
2049                'set_nonpairable': set_nonpairable,
2050                'is_nonpairable': is_nonpairable}
2051        return all(self.results.values())
2052
2053
2054    @test_retry_and_log(False)
2055    def test_check_valid_adapter_id(self):
2056        """Fail if the Bluetooth ID is not in the correct format.
2057
2058        @returns True if adapter ID follows expected format, False otherwise
2059        """
2060
2061        device = self.get_base_platform_name()
2062        adapter_info = self.get_adapter_properties()
2063
2064        # Don't complete test if this is a reference board
2065        if device in self.REFERENCE_BOARDS:
2066            return True
2067
2068        modalias = adapter_info['Modalias']
2069        logging.debug('Saw Bluetooth ID of: %s', modalias)
2070
2071        # Valid Device ID is:
2072        # <00E0(Google)>/<C405(ChromeOS)>/<non-zero versionNumber>
2073        bt_format = 'bluetooth:v00E0pC405d(?!0000)'
2074
2075        if not re.match(bt_format, modalias):
2076            return False
2077
2078        return True
2079
2080
2081    @test_retry_and_log(False)
2082    def test_check_valid_alias(self):
2083        """Fail if the Bluetooth alias is not in the correct format.
2084
2085        @returns True if adapter alias follows expected format, False otherwise
2086        """
2087
2088        device = self.get_base_platform_name()
2089        adapter_info = self.get_adapter_properties()
2090
2091        # Don't complete test if this is a reference board
2092        if device in self.REFERENCE_BOARDS:
2093            return True
2094
2095        alias = adapter_info['Alias']
2096        logging.debug('Saw Bluetooth Alias of: %s', alias)
2097
2098        device_type = self.host.get_board_type().lower()
2099        alias_format = '%s_[a-z0-9]{4}' % device_type
2100
2101        self.results = {}
2102
2103        alias_was_correct = True
2104        if not re.match(alias_format, alias.lower()):
2105            alias_was_correct = False
2106            logging.info('unexpected alias %s found', alias)
2107            self.results['alias_found'] = alias
2108
2109        self.results['alias_was_correct'] = alias_was_correct
2110        return all(self.results.values())
2111
2112
2113    # -------------------------------------------------------------------
2114    # Tests about general discovering, pairing, and connection
2115    # -------------------------------------------------------------------
2116
2117
2118    @test_retry_and_log(False)
2119    def test_discover_device(self,
2120                             device_address,
2121                             start_discovery=True,
2122                             stop_discovery=True):
2123        """Test that the adapter could discover the specified device address.
2124
2125        @param device_address: Address of the device.
2126        @param start_discovery: Whether to start discovery. Set to False if you
2127                                call start_discovery before calling this.
2128        @param stop_discovery: Whether to stop discovery at the end. If this is
2129                               set to False, make sure to call
2130                               test_stop_discovery afterwards.
2131
2132        @returns: True if the device is found. False otherwise.
2133
2134        """
2135        discovery_stopped = False
2136        is_not_discovering = False
2137        device_discovered = False
2138        # If start discovery is not set, discovery must already be started
2139        discovery_started = not start_discovery
2140        has_device = self.bluetooth_facade.has_device
2141
2142        if start_discovery:
2143            if has_device(device_address):
2144                # Before starting a new discovery, remove the found device since
2145                # it is likely to be a temporary device, and we don't know when
2146                # it will be removed by bluez. Therefore, remove it and re-find
2147                # the device to ensure the device object exists for the
2148                # following test, e.g. test_pairing.
2149                logging.debug('Removing device %s to restart temporary timer',
2150                              device_address)
2151                self.bluetooth_facade.remove_device_object(device_address)
2152
2153            discovery_started = self.bluetooth_facade.start_discovery()
2154
2155        if discovery_started:
2156            try:
2157                utils.poll_for_condition(
2158                        condition=(lambda: has_device(device_address)),
2159                        timeout=self.ADAPTER_DISCOVER_TIMEOUT_SECS,
2160                        sleep_interval=self.
2161                        ADAPTER_DISCOVER_POLLING_SLEEP_SECS,
2162                        desc='Waiting for discovering %s' % device_address)
2163                device_discovered = True
2164            except utils.TimeoutError as e:
2165                logging.error('test_discover_device: %s', e)
2166            except Exception as e:
2167                logging.error('test_discover_device: %s', e)
2168                err = ('bluetoothd probably crashed.'
2169                       'Check out /var/log/messages')
2170                logging.error(err)
2171            except:
2172                logging.error('test_discover_device: unexpected error')
2173
2174        if start_discovery and stop_discovery:
2175            discovery_stopped, _ = self.bluetooth_facade.stop_discovery()
2176            is_not_discovering = self._wait_for_condition(
2177                    lambda: not self.bluetooth_facade.is_discovering(),
2178                    method_name())
2179
2180        self.results = {
2181                'should_start_discovery': start_discovery,
2182                'should_stop_discovery': stop_discovery,
2183                'start_discovery': discovery_started,
2184                'stop_discovery': discovery_stopped,
2185                'is_not_discovering': is_not_discovering,
2186                'device_discovered': device_discovered}
2187
2188        # Make sure a discovered device properly started and stopped discovery
2189        device_found = device_discovered and (discovery_stopped
2190                                              and is_not_discovering
2191                                              if stop_discovery else True)
2192
2193        return device_found
2194
2195
2196    def _test_discover_by_device(self, device):
2197        return device.Discover(self.bluetooth_facade.address)
2198
2199    @test_retry_and_log(False, messages_start=False, messages_stop=False)
2200    def test_discover_by_device(self, device):
2201        """Test that the device could discover the adapter address.
2202
2203        @param device: Meta device to represent peer device.
2204
2205        @returns: True if the adapter is found by the device.
2206        """
2207        adapter_discovered = False
2208        discover_by_device = self._test_discover_by_device
2209        discovered_initially = discover_by_device(device)
2210
2211        if not discovered_initially:
2212            try:
2213                utils.poll_for_condition(
2214                        condition=(lambda: discover_by_device(device)),
2215                        timeout=self.ADAPTER_DISCOVER_TIMEOUT_SECS,
2216                        sleep_interval=
2217                        self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS,
2218                        desc='Waiting for adapter to be discovered')
2219                adapter_discovered = True
2220            except utils.TimeoutError as e:
2221                logging.error('test_discover_by_device: %s', e)
2222            except Exception as e:
2223                logging.error('test_discover_by_device: %s', e)
2224                err = ('bluetoothd probably crashed.'
2225                       'Check out /var/log/messages')
2226                logging.error(err)
2227            except:
2228                logging.error('test_discover_by_device: unexpected error')
2229
2230        self.results = {
2231            'adapter_discovered_initially': discovered_initially,
2232            'adapter_discovered': adapter_discovered
2233        }
2234        return any(self.results.values())
2235
2236    @test_retry_and_log(False, messages_start=False, messages_stop=False)
2237    def test_discover_by_device_fails(self, device):
2238        """Test that the device could not discover the adapter address.
2239
2240        @param device: Meta device to represent peer device.
2241
2242        @returns False if the adapter is found by the device.
2243        """
2244        self.results = {
2245                'adapter_discovered': self._test_discover_by_device(device)
2246        }
2247        return not any(self.results.values())
2248
2249    @test_retry_and_log(False, messages_start=False, messages_stop=False)
2250    def test_device_set_discoverable(self, device, discoverable):
2251        """Test that we could set the peer device to discoverable. """
2252        try:
2253            device.SetDiscoverable(discoverable)
2254        except:
2255            return False
2256
2257        return True
2258
2259    @test_retry_and_log
2260    def test_pairing(self, device_address, pin, trusted=True):
2261        """Test that the adapter could pair with the device successfully.
2262
2263        @param device_address: Address of the device.
2264        @param pin: pin code to pair with the device.
2265        @param trusted: indicating whether to set the device trusted.
2266
2267        @returns: True if pairing succeeds. False otherwise.
2268
2269        """
2270
2271        def _pair_device():
2272            """Pair to the device.
2273
2274            @returns: True if it could pair with, connect to, and retrieve
2275                      connection info from the device. False otherwise.
2276
2277            """
2278            self.results['paired'] = self.bluetooth_facade.pair_legacy_device(
2279                    device_address, pin, trusted,
2280                    self.ADAPTER_PAIRING_TIMEOUT_SECS)
2281            self.results[
2282                    'connected'] = self.bluetooth_facade.device_is_connected(
2283                            device_address)
2284            self.results[
2285                    'connection_info_retrievable'] = self.bluetooth_facade.has_connection_info(
2286                            device_address)
2287
2288            return self.results['paired'] and self.results[
2289                    'connected'] and self.results['connection_info_retrievable']
2290
2291        self.results = {
2292                'has_device': False,
2293                'paired': False,
2294                'connected': False,
2295                'connection_info_retrievable': False,
2296                'connection_num':
2297                self.bluetooth_facade.get_num_connected_devices() + 1
2298        }
2299
2300        if self.bluetooth_facade.has_device(device_address):
2301            self.results['has_device'] = True
2302            try:
2303                utils.poll_for_condition(
2304                        condition=_pair_device,
2305                        timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
2306                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
2307                        desc='Waiting for pairing %s' % device_address)
2308            except utils.TimeoutError as e:
2309                logging.error('test_pairing: %s', e)
2310            except:
2311                logging.error('test_pairing: unexpected error')
2312
2313        return all(self.results.values())
2314
2315    @test_retry_and_log
2316    def test_remove_pairing(self, device_address):
2317        """Test that the adapter could remove the paired device.
2318
2319        @param device_address: Address of the device.
2320
2321        @returns: True if the device is removed successfully. False otherwise.
2322
2323        """
2324        device_is_paired_initially = self.bluetooth_facade.device_is_paired(
2325                device_address)
2326        remove_pairing = False
2327        pairing_removed = False
2328
2329        if device_is_paired_initially:
2330            remove_pairing = self.bluetooth_facade.remove_device_object(
2331                    device_address)
2332            pairing_removed = not self.bluetooth_facade.device_is_paired(
2333                    device_address)
2334
2335        self.results = {
2336                'device_is_paired_initially': device_is_paired_initially,
2337                'remove_pairing': remove_pairing,
2338                'pairing_removed': pairing_removed}
2339        return all(self.results.values())
2340
2341
2342    def test_set_trusted(self, device_address, trusted=True):
2343        """Test whether the device with the specified address is trusted.
2344
2345        @param device_address: Address of the device.
2346        @param trusted : True or False indicating if trusted is expected.
2347
2348        @returns: True if the device's "Trusted" property is as specified;
2349                  False otherwise.
2350
2351        """
2352
2353        set_trusted = self.bluetooth_facade.set_trusted(
2354                device_address, trusted)
2355
2356        actual_trusted = self.bluetooth_facade.get_device_property(
2357                                device_address, 'Trusted')
2358
2359        self.results = {
2360                'set_trusted': set_trusted,
2361                'actual trusted': actual_trusted,
2362                'expected trusted': trusted}
2363        return actual_trusted == trusted
2364
2365
2366    @test_retry_and_log
2367    def test_connection_by_adapter(self, device_address):
2368        """Test that the adapter of dut could connect to the device successfully
2369
2370        It is the caller's responsibility to pair to the device before
2371        doing connection.
2372
2373        @param device_address: Address of the device.
2374
2375        @returns: True if connection is performed. False otherwise.
2376
2377        """
2378
2379        def _connect_device():
2380            """Connect to the device.
2381
2382            @returns: True if it could connect to the device. False otherwise.
2383
2384            """
2385            return self.bluetooth_facade.connect_device(device_address)
2386
2387
2388        has_device = False
2389        connected = False
2390        if self.bluetooth_facade.has_device(device_address):
2391            has_device = True
2392            try:
2393                utils.poll_for_condition(
2394                        condition=_connect_device,
2395                        timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
2396                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
2397                        desc='Waiting for connecting to %s' % device_address)
2398                connected = True
2399            except utils.TimeoutError as e:
2400                logging.error('test_connection_by_adapter: %s', e)
2401            except:
2402                logging.error('test_connection_by_adapter: unexpected error')
2403
2404        self.results = {'has_device': has_device, 'connected': connected}
2405        return all(self.results.values())
2406
2407
2408    @test_retry_and_log
2409    def test_disconnection_by_adapter(self, device_address):
2410        """Test that the adapter of dut could disconnect the device successfully
2411
2412        @param device_address: Address of the device.
2413
2414        @returns: True if disconnection is performed. False otherwise.
2415
2416        """
2417        return self.bluetooth_facade.disconnect_device(device_address)
2418
2419
2420    def _enter_command_mode(self, device):
2421        """Let the device enter command mode.
2422
2423        Before using the device, need to call this method to make sure
2424        it is in the command mode.
2425
2426        @param device: the bluetooth HID device
2427
2428        @returns: True if successful. False otherwise.
2429
2430        """
2431        result = _is_successful(_run_method(device.EnterCommandMode,
2432                                            'EnterCommandMode'))
2433        if not result:
2434            logging.error('EnterCommandMode failed')
2435        return result
2436
2437
2438    @test_retry_and_log
2439    def test_connection_by_device(
2440            self, device, post_connection_delay=ADAPTER_HID_INPUT_DELAY):
2441        """Test that the device could connect to the adapter successfully.
2442
2443        This emulates the behavior that a device may initiate a
2444        connection request after waking up from power saving mode.
2445
2446        @param device: the bluetooth HID device
2447        @param post_connection_delay: the delay introduced post connection to
2448                                      allow profile functionality to be ready
2449
2450        @returns: True if connection is performed correctly by device and
2451                  the adapter also enters connection state.
2452                  False otherwise.
2453
2454        """
2455        if not self._enter_command_mode(device):
2456            return False
2457
2458        method_name = 'test_connection_by_device'
2459        connection_by_device = False
2460        adapter_address = self.bluetooth_facade.address
2461        try:
2462            connection_by_device = device.ConnectToRemoteAddress(
2463                adapter_address)
2464        except Exception as e:
2465            logging.error('%s (device): %s', method_name, e)
2466        except:
2467            logging.error('%s (device): unexpected error', method_name)
2468
2469        connection_seen_by_adapter = False
2470        device_address = device.address
2471        device_is_connected = self.bluetooth_facade.device_is_connected
2472        try:
2473            utils.poll_for_condition(
2474                    condition=lambda: device_is_connected(device_address),
2475                    timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS,
2476                    desc=('Waiting for connection from %s' % device_address))
2477            connection_seen_by_adapter = True
2478
2479            # Although the connect may be complete, it can take a few
2480            # seconds for the input device to be ready for use
2481            time.sleep(post_connection_delay)
2482        except utils.TimeoutError as e:
2483            logging.error('%s (adapter): %s', method_name, e)
2484        except:
2485            logging.error('%s (adapter): unexpected error', method_name)
2486
2487        self.results = {
2488                'connection_by_device': connection_by_device,
2489                'connection_seen_by_adapter': connection_seen_by_adapter}
2490        return all(self.results.values())
2491
2492    @test_retry_and_log(True, messages_start=False, messages_stop=False)
2493    def test_connection_by_device_only(self, device, adapter_address):
2494        """Test that the device could connect to adapter successfully.
2495
2496        This is a modified version of test_connection_by_device that only
2497        communicates with the peer device and not the host (in case the host is
2498        suspended for example).
2499
2500        @param device: the bluetooth peer device
2501        @param adapter_address: address of the adapter
2502
2503        @returns: True if the connection was established by the device or False.
2504        """
2505        connected = device.ConnectToRemoteAddress(adapter_address)
2506        if connected:
2507            # Although the connect may be complete, it can take a few
2508            # seconds for the input device to be ready for use
2509            time.sleep(self.ADAPTER_HID_INPUT_DELAY)
2510
2511        self.results = {
2512            'connection_by_device': connected
2513        }
2514
2515        return all(self.results.values())
2516
2517
2518    @test_retry_and_log
2519    def test_disconnection_by_device(self, device):
2520        """Test that the device could disconnect the adapter successfully.
2521
2522        This emulates the behavior that a device may initiate a
2523        disconnection request before going into power saving mode.
2524
2525        Note: should not try to enter command mode in this method. When
2526              a device is connected, there is no way to enter command mode.
2527              One could just issue a special disconnect command without
2528              entering command mode.
2529
2530        @param device: the bluetooth HID device
2531
2532        @returns: True if disconnection is performed correctly by device and
2533                  the adapter also observes the disconnection.
2534                  False otherwise.
2535
2536        """
2537        # TODO(b/182864322) - remove the following statement when the bug
2538        # is fixed.
2539        device.SetRemoteAddress(self.bluetooth_facade.address)
2540
2541        method_name = 'test_disconnection_by_device'
2542        disconnection_by_device = False
2543        try:
2544            device.Disconnect()
2545            disconnection_by_device = True
2546        except Exception as e:
2547            logging.error('%s (device): %s', method_name, e)
2548        except:
2549            logging.error('%s (device): unexpected error', method_name)
2550
2551        disconnection_seen_by_adapter = False
2552        device_address = device.address
2553        device_is_connected = self.bluetooth_facade.device_is_connected
2554        try:
2555            utils.poll_for_condition(
2556                    condition=lambda: not device_is_connected(device_address),
2557                    timeout=self.ADAPTER_DISCONNECTION_TIMEOUT_SECS,
2558                    desc=('Waiting for disconnection from %s' % device_address))
2559            disconnection_seen_by_adapter = True
2560        except utils.TimeoutError as e:
2561            logging.error('%s (adapter): %s', method_name, e)
2562        except:
2563            logging.error('%s (adapter): unexpected error', method_name)
2564
2565        self.results = {
2566                'disconnection_by_device': disconnection_by_device,
2567                'disconnection_seen_by_adapter': disconnection_seen_by_adapter}
2568        return all(self.results.values())
2569
2570
2571    @test_retry_and_log(False)
2572    def test_device_is_connected(
2573            self,
2574            device_address,
2575            timeout=ADAPTER_CONNECTION_TIMEOUT_SECS,
2576            sleep_interval=ADAPTER_PAIRING_POLLING_SLEEP_SECS):
2577        """Test that device address given is currently connected.
2578
2579        @param device_address: Address of the device.
2580        @param timeout: maximum number of seconds to wait
2581        @param sleep_interval: time to sleep between polls
2582
2583        @returns: True if the device is connected.
2584                  False otherwise.
2585        """
2586
2587        def _is_connected():
2588            """Test if device is connected.
2589
2590            @returns: True if device is connected. False otherwise.
2591
2592            """
2593            return self.bluetooth_facade.device_is_connected(device_address)
2594
2595        method_name = 'test_device_is_connected'
2596        has_device = False
2597        connected = False
2598        if self.bluetooth_facade.has_device(device_address):
2599            has_device = True
2600            try:
2601                utils.poll_for_condition(
2602                        condition=_is_connected,
2603                        timeout=timeout,
2604                        sleep_interval=sleep_interval,
2605                        desc='Waiting to check connection to %s' %
2606                        device_address)
2607                connected = True
2608            except utils.TimeoutError as e:
2609                logging.error('%s: %s', method_name, e)
2610            except:
2611                logging.error('%s: unexpected error', method_name)
2612        self.results = {'has_device': has_device, 'connected': connected}
2613        return all(self.results.values())
2614
2615
2616    @test_retry_and_log(False)
2617    def test_device_is_not_connected(self, device_address):
2618        """Test that device address given is NOT currently connected.
2619
2620        @param device_address: Address of the device.
2621
2622        @returns: True if the device is NOT connected.
2623                  False otherwise.
2624
2625        """
2626
2627        def _is_not_connected():
2628            """Test if device is not connected.
2629
2630            @returns: True if device is not connected. False otherwise.
2631
2632            """
2633            return not self.bluetooth_facade.device_is_connected(
2634                    device_address)
2635
2636
2637        method_name = 'test_device_is_not_connected'
2638        not_connected = False
2639        if self.bluetooth_facade.has_device(device_address):
2640            try:
2641                utils.poll_for_condition(
2642                        condition=_is_not_connected,
2643                        timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS,
2644                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
2645                        desc='Waiting to check connection to %s' %
2646                              device_address)
2647                not_connected = True
2648            except utils.TimeoutError as e:
2649                logging.error('%s: %s', method_name, e)
2650            except:
2651                logging.error('%s: unexpected error', method_name)
2652                raise
2653        else:
2654            not_connected = True
2655        self.results = {'not_connected': not_connected}
2656        return all(self.results.values())
2657
2658
2659    @test_retry_and_log
2660    def test_device_is_paired(self, device_address):
2661        """Test that the device address given is currently paired.
2662
2663        @param device_address: Address of the device.
2664
2665        @returns: True if the device is paired.
2666                  False otherwise.
2667
2668        """
2669        def _is_paired():
2670            """Test if device is paired.
2671
2672            @returns: True if device is paired. False otherwise.
2673
2674            """
2675            return self.bluetooth_facade.device_is_paired(device_address)
2676
2677
2678        method_name = 'test_device_is_paired'
2679        has_device = False
2680        paired = False
2681        if self.bluetooth_facade.has_device(device_address):
2682            has_device = True
2683            try:
2684                utils.poll_for_condition(
2685                        condition=_is_paired,
2686                        timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
2687                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
2688                        desc='Waiting for connection to %s' % device_address)
2689                paired = True
2690            except utils.TimeoutError as e:
2691                logging.error('%s: %s', method_name, e)
2692            except:
2693                logging.error('%s: unexpected error', method_name)
2694        self.results = {'has_device': has_device, 'paired': paired}
2695        return all(self.results.values())
2696
2697
2698    def _get_device_name(self, device_address):
2699        """Get the device name.
2700
2701        @returns: True if the device name is derived. None otherwise.
2702
2703        """
2704
2705        self.discovered_device_name = self.bluetooth_facade.get_device_property(
2706                                device_address, 'Name')
2707
2708        return bool(self.discovered_device_name)
2709
2710
2711    @test_retry_and_log
2712    def test_device_name(self, device_address, expected_device_name):
2713        """Test that the device name discovered by the adapter is correct.
2714
2715        @param device_address: Address of the device.
2716        @param expected_device_name: the bluetooth device name
2717
2718        @returns: True if the discovered_device_name is expected_device_name.
2719                  False otherwise.
2720
2721        """
2722        try:
2723            utils.poll_for_condition(
2724                    condition=lambda: self._get_device_name(device_address),
2725                    timeout=self.ADAPTER_DISCOVER_NAME_TIMEOUT_SECS,
2726                    sleep_interval=self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS,
2727                    desc='Waiting for device name of %s' % device_address)
2728        except utils.TimeoutError as e:
2729            logging.error('test_device_name: %s', e)
2730        except:
2731            logging.error('test_device_name: unexpected error')
2732
2733        self.results = {
2734                'expected_device_name': expected_device_name,
2735                'discovered_device_name': self.discovered_device_name}
2736        return self.discovered_device_name == expected_device_name
2737
2738
2739    @test_retry_and_log
2740    def test_device_class_of_service(self, device_address,
2741                                     expected_class_of_service):
2742        """Test that the discovered device class of service is as expected.
2743
2744        @param device_address: Address of the device.
2745        @param expected_class_of_service: the expected class of service
2746
2747        @returns: True if the discovered class of service matches the
2748                  expected class of service. False otherwise.
2749
2750        """
2751
2752        device_class = self.bluetooth_facade.get_device_property(device_address,
2753                                                                 'Class')
2754        discovered_class_of_service = (device_class & self.CLASS_OF_SERVICE_MASK
2755                                       if device_class else None)
2756
2757        self.results = {
2758                'device_class': device_class,
2759                'expected_class_of_service': expected_class_of_service,
2760                'discovered_class_of_service': discovered_class_of_service}
2761        return discovered_class_of_service == expected_class_of_service
2762
2763
2764    @test_retry_and_log
2765    def test_device_class_of_device(self, device_address,
2766                                    expected_class_of_device):
2767        """Test that the discovered device class of device is as expected.
2768
2769        @param device_address: Address of the device.
2770        @param expected_class_of_device: the expected class of device
2771
2772        @returns: True if the discovered class of device matches the
2773                  expected class of device. False otherwise.
2774
2775        """
2776
2777        device_class = self.bluetooth_facade.get_device_property(device_address,
2778                                                                 'Class')
2779        discovered_class_of_device = (device_class & self.CLASS_OF_DEVICE_MASK
2780                                      if device_class else None)
2781
2782        self.results = {
2783                'device_class': device_class,
2784                'expected_class_of_device': expected_class_of_device,
2785                'discovered_class_of_device': discovered_class_of_device}
2786        return discovered_class_of_device == expected_class_of_device
2787
2788
2789    def _get_btmon_log(self, method, logging_timespan=1):
2790        """Capture the btmon log when executing the specified method.
2791
2792        @param method: the method to capture log.
2793                       The method would be executed only when it is not None.
2794                       This allows us to simply capture btmon log without
2795                       executing any command.
2796        @param logging_timespan: capture btmon log for logging_timespan seconds.
2797
2798        """
2799        self.bluetooth_le_facade.btmon_start()
2800        self.advertising_msg = method() if method else ''
2801        time.sleep(logging_timespan)
2802        self.bluetooth_le_facade.btmon_stop()
2803
2804
2805    def convert_to_adv_jiffies(self, adv_interval_ms):
2806        """Convert adv interval in ms to jiffies, i.e., multiples of 0.625 ms.
2807
2808        @param adv_interval_ms: an advertising interval
2809
2810        @returns: the equivalent jiffies
2811
2812        """
2813        return int(round(adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT))
2814
2815
2816    def compute_duration(self, max_adv_interval_ms):
2817        """Compute duration from max_adv_interval_ms.
2818
2819        Advertising duration is calculated approximately as
2820            duration = max_adv_interval_ms / 1000.0 * 1.1
2821
2822        @param max_adv_interval_ms: max advertising interval in milliseconds.
2823
2824        @returns: duration in seconds.
2825
2826        """
2827        return max_adv_interval_ms / 1000.0 * 1.1
2828
2829
2830    def compute_logging_timespan(self, max_adv_interval_ms):
2831        """Compute the logging timespan from max_adv_interval_ms.
2832
2833        The logging timespan is the time needed to record btmon log.
2834
2835        @param max_adv_interval_ms: max advertising interval in milliseconds.
2836
2837        @returns: logging_timespan in seconds.
2838
2839        """
2840        duration = self.compute_duration(max_adv_interval_ms)
2841        logging_timespan = max(self.count_advertisements * duration, 1)
2842        return logging_timespan
2843
2844
2845    @test_retry_and_log(False)
2846    def test_check_duration_and_intervals(self, min_adv_interval_ms,
2847                                          max_adv_interval_ms,
2848                                          number_advertisements):
2849        """Verify that every advertisements are scheduled according to the
2850        duration and intervals.
2851
2852        An advertisement would be scheduled at the time span of
2853             duration * number_advertisements
2854
2855        @param min_adv_interval_ms: min advertising interval in milliseconds.
2856        @param max_adv_interval_ms: max advertising interval in milliseconds.
2857        @param number_advertisements: the number of existing advertisements
2858
2859        @returns: True if all advertisements are scheduled based on the
2860                duration and intervals.
2861
2862        """
2863
2864
2865        def within_tolerance(expected, actual, max_error=0.1):
2866            """Determine if the percent error is within specified tolerance.
2867
2868            @param expected: The expected value.
2869            @param actual: The actual (measured) value.
2870            @param max_error: The maximum percent error acceptable.
2871
2872            @returns: True if the percent error is less than or equal to
2873                      max_error.
2874            """
2875            return abs(expected - actual) / abs(expected) <= max_error
2876
2877
2878        start_str = 'Set Advertising Intervals:'
2879        search_strings = ['HCI Command: LE Set Advertising Data', 'Company']
2880        search_str = '|'.join(search_strings)
2881
2882        contents = self.bluetooth_le_facade.btmon_get(search_str=search_str,
2883                                                      start_str=start_str)
2884
2885        # Company string looks like
2886        #   Company: not assigned (65283)
2887        company_pattern = re.compile('Company:.*\((\d*)\)')
2888
2889        # The string with timestamp looks like
2890        #   < HCI Command: LE Set Advertising Data (0x08|0x0008) [hci0] 3.799236
2891        set_adv_time_str = 'LE Set Advertising Data.*\[hci\d\].*(\d+\.\d+)'
2892        set_adv_time_pattern = re.compile(set_adv_time_str)
2893
2894        adv_timestamps = {}
2895        timestamp = None
2896        manufacturer_id = None
2897        for line in contents:
2898            result = set_adv_time_pattern.search(line)
2899            if result:
2900                timestamp = float(result.group(1))
2901
2902            result = company_pattern.search(line)
2903            if result:
2904                manufacturer_id = '0x%04x' % int(result.group(1))
2905
2906            if timestamp and manufacturer_id:
2907                if manufacturer_id not in adv_timestamps:
2908                    adv_timestamps[manufacturer_id] = []
2909                adv_timestamps[manufacturer_id].append(timestamp)
2910                timestamp = None
2911                manufacturer_id = None
2912
2913        duration = self.compute_duration(max_adv_interval_ms)
2914        expected_timespan = duration * number_advertisements
2915
2916        check_duration = True
2917        for manufacturer_id, values in six.iteritems(adv_timestamps):
2918            logging.debug('manufacturer_id %s: %s', manufacturer_id, values)
2919            timespans = [values[i] - values[i - 1]
2920                         for i in range(1, len(values))]
2921            errors = [timespans[i] for i in range(len(timespans))
2922                      if not within_tolerance(expected_timespan, timespans[i])]
2923            logging.debug('timespans: %s', timespans)
2924            logging.debug('errors: %s', errors)
2925            if bool(errors):
2926                check_duration = False
2927
2928        # Verify that the advertising intervals are also correct.
2929        min_adv_interval_ms_found, max_adv_interval_ms_found = (
2930                self._verify_advertising_intervals(min_adv_interval_ms,
2931                                                   max_adv_interval_ms))
2932
2933        self.results = {
2934                'check_duration': check_duration,
2935                'min_adv_interval_ms_found': min_adv_interval_ms_found,
2936                'max_adv_interval_ms_found': max_adv_interval_ms_found,
2937        }
2938        return all(self.results.values())
2939
2940
2941    def _get_min_max_intervals_strings(self, min_adv_interval_ms,
2942                                       max_adv_interval_ms):
2943        """Get the min and max advertising intervals strings shown in btmon.
2944
2945        Advertising intervals shown in the btmon log look like
2946            Min advertising interval: 1280.000 msec (0x0800)
2947            Max advertising interval: 1280.000 msec (0x0800)
2948
2949        @param min_adv_interval_ms: min advertising interval in milliseconds.
2950        @param max_adv_interval_ms: max advertising interval in milliseconds.
2951
2952        @returns: the min and max intervals strings.
2953
2954        """
2955        min_str = ('Min advertising interval: %.3f msec (0x%04x)' %
2956                   (min_adv_interval_ms,
2957                    self.convert_to_adv_jiffies(min_adv_interval_ms)))
2958        logging.debug('min_adv_interval_ms: %s', min_str)
2959
2960        max_str = ('Max advertising interval: %.3f msec (0x%04x)' %
2961                   (max_adv_interval_ms,
2962                    self.convert_to_adv_jiffies(max_adv_interval_ms)))
2963        logging.debug('max_adv_interval_ms: %s', max_str)
2964
2965        return (min_str, max_str)
2966
2967
2968    def _verify_advertising_intervals(self, min_adv_interval_ms,
2969                                      max_adv_interval_ms):
2970        """Verify min and max advertising intervals.
2971
2972        Advertising intervals look like
2973            Min advertising interval: 1280.000 msec (0x0800)
2974            Max advertising interval: 1280.000 msec (0x0800)
2975
2976        @param min_adv_interval_ms: min advertising interval in milliseconds.
2977        @param max_adv_interval_ms: max advertising interval in milliseconds.
2978
2979        @returns: a tuple of (True, True) if both min and max advertising
2980            intervals could be found. Otherwise, the corresponding element
2981            in the tuple if False.
2982
2983        """
2984        min_str, max_str = self._get_min_max_intervals_strings(
2985                min_adv_interval_ms, max_adv_interval_ms)
2986
2987        min_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(min_str)
2988        max_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(max_str)
2989
2990        return min_adv_interval_ms_found, max_adv_interval_ms_found
2991
2992
2993    def _verify_scan_response_data(self, adv_data):
2994        """Verify advertisement's scan response data is correct
2995
2996        Unlike the other fixed advertising fields, Scan Response Data is set
2997        in a tag-value data format. This function helps verify the data format
2998        for specific tag values to ensure scan response was propagated correctly
2999
3000        @param adv_data: Dictionary defining advertising fields to be registered
3001                with bluetoothd daemon's RegisterAdvertisement interface
3002
3003        @returns: True if all Registered Scan Response tags were located in
3004                btmon trace, False otherwise
3005        """
3006
3007        scan_rsp = adv_data.get('ScanResponseData')
3008        if not scan_rsp:
3009            return True
3010
3011        for tag, data in scan_rsp.items():
3012            # Validate 16 bit Service Data tag
3013            if int(tag, 16) == 0x16:
3014                # First two bytes of data are endian-corrected UUID, followed
3015                # by service data
3016                uuid = '%x%x' % (data[1], data[0])
3017                data_str = ''.join(
3018                        ['%02x' % data[i] for i in range(2, len(data))])
3019
3020                # Service data has the following format in btmon trace:
3021                # Service Data (UUID 0xfef3): 01020304
3022                search_str = 'Service Data (UUID 0x{}): {}'.format(
3023                        uuid, data_str)
3024
3025                # Fail if data can't be located in btmon trace
3026                if not self.bluetooth_le_facade.btmon_find(search_str):
3027                    return False
3028
3029        return True
3030
3031    def test_advertising_flags(self, flag_strs=[]):
3032        """Verify that advertising flags are set in registered advertisement
3033
3034        Each flag has a specific descriptor that appears in btmon trace. This
3035        simple checker validates that the desired flag descriptors appear in
3036        btmon trace when the advertisement was registered.
3037
3038        @param flag_strs: Flag string descriptors expected in btmon trace
3039
3040        #returns: True if all flag descriptors were located, False otherwise
3041        """
3042
3043        for flag_str in flag_strs:
3044            if not self.bluetooth_le_facade.btmon_find(flag_str):
3045                logging.info(
3046                        'Flag descriptor not located: {}'.format(flag_str))
3047                return False
3048
3049        return True
3050
3051    def ext_adv_enabled(self):
3052        """ Check if platform supports extended advertising
3053
3054        @returns True if extended advertising is supported, else False
3055        """
3056
3057        adv_features = self.bluetooth_facade.get_advertising_manager_property(
3058                'SupportedFeatures')
3059
3060        return 'HardwareOffload' in adv_features
3061
3062    def _verify_adv_tx_power(self, advertising_data):
3063        """ Verify that advertisement uses Tx Power correctly via the following:
3064
3065            1. Confirm the correct Tx Power is propagated in both MGMT and
3066                HCI commands.
3067            2. Validate that the Tx Power selected by the controller is
3068                returned to the client via dbus.
3069
3070        @param: advertising_data: dictionary of advertising data properties
3071            used to register the advertisement
3072
3073        @returns: True if the above requirements are met, False otherwise
3074        """
3075
3076        # If we aren't using TxPower in this advertisement, success
3077        if not self.ext_adv_enabled() or 'TxPower' not in advertising_data:
3078            return True
3079
3080        # Make sure the correct Tx power was passed in both MGMT and HCI
3081        # commands by searching for two instances of search string
3082        search_str = 'TX power: {} dbm'.format(advertising_data['TxPower'])
3083        contents = self.bluetooth_le_facade.btmon_get(search_str=search_str,
3084                                                      start_str='')
3085        if len(contents) < 2:
3086            logging.error('Could not locate correct Tx power in MGMT and HCI')
3087            return False
3088
3089        # Locate tx power selected by controller
3090        search_str = 'TX power \(selected\)'
3091        contents = self.bluetooth_le_facade.btmon_get(search_str=search_str,
3092                                                      start_str='')
3093
3094        if not contents:
3095            logging.error('No Tx Power selected event found, failing')
3096            return False
3097
3098        # The line we want has the following structure:
3099        # 'TX power (selected): -5 dbm (0x07)'
3100        # We locate the number before 'dbm'
3101        items = contents[0].split(' ')
3102        selected_tx_power = int(items[items.index('dbm') - 1])
3103
3104        # Validate that client's advertisement was updated correctly.
3105        new_tx_prop = self.bluetooth_le_facade.get_advertisement_property(
3106                advertising_data['Path'], 'TxPower')
3107
3108        return new_tx_prop == selected_tx_power
3109
3110    @test_retry_and_log(False)
3111    def test_register_advertisement(self, advertisement_data, instance_id):
3112        """Verify that an advertisement is registered correctly.
3113
3114        This test verifies the following data:
3115        - advertisement added
3116        - manufacturer data
3117        - service UUIDs
3118        - service data
3119        - advertising intervals
3120        - advertising enabled
3121        - Tx power set (if extended advertising available)
3122
3123        @param advertisement_data: the data of an advertisement to register.
3124        @param instance_id: the instance id which starts at 1.
3125
3126        @returns: True if the advertisement is registered correctly.
3127                  False otherwise.
3128
3129        """
3130
3131        # We need to know the intervals used to verify later. If advertisement
3132        # structure contains it, use them. Otherwise, use bluez's defaults
3133        if set(advertisement_data) >= {'MinInterval', 'MaxInterval'}:
3134            min_adv_interval_ms = advertisement_data['MinInterval']
3135            max_adv_interval_ms = advertisement_data['MaxInterval']
3136
3137        else:
3138            min_adv_interval_ms = self.DEFAULT_MIN_ADVERTISEMENT_INTERVAL_MS
3139            max_adv_interval_ms = self.DEFAULT_MAX_ADVERTISEMENT_INTERVAL_MS
3140
3141        # When registering a new advertisement, it is possible that another
3142        # instance is advertising. It may need to wait for all other
3143        # advertisements to complete advertising once.
3144        self.count_advertisements += 1
3145        logging_timespan = self.compute_logging_timespan(max_adv_interval_ms)
3146        self._get_btmon_log(
3147                lambda: self.bluetooth_le_facade.register_advertisement(
3148                        advertisement_data),
3149                logging_timespan=logging_timespan)
3150
3151        # _get_btmon_log will store the return value of the registration request
3152        # in self.advertising_msg. If the request was successful, the return
3153        # value was an empty string
3154        registration_succeeded = (self.advertising_msg == '')
3155
3156        # Verify that a new advertisement is added.
3157        advertisement_added = (
3158                self.bluetooth_le_facade.btmon_find('Advertising Added') and
3159                self.bluetooth_le_facade.btmon_find('Instance: %d' %
3160                                                    instance_id))
3161
3162        # Verify that the manufacturer data could be found.
3163        manufacturer_data = advertisement_data.get('ManufacturerData', '')
3164        manufacturer_data_found = True
3165        for manufacturer_id in manufacturer_data:
3166            # The 'not assigned' text below means the manufacturer id
3167            # is not actually assigned to any real manufacturer.
3168            # For real 16-bit manufacturer UUIDs, refer to
3169            #  https://www.bluetooth.com/specifications/assigned-numbers/16-bit-UUIDs-for-Members
3170            manufacturer_data_found = self.bluetooth_le_facade.btmon_find(
3171                    'Company: not assigned (%d)' % int(manufacturer_id, 16))
3172
3173        # Verify that all service UUIDs could be found.
3174        service_uuids_found = True
3175        for uuid in advertisement_data.get('ServiceUUIDs', []):
3176            # Service UUIDs looks like ['0x180D', '0x180F']
3177            #   Heart Rate (0x180D)
3178            #   Battery Service (0x180F)
3179            # For actual 16-bit service UUIDs, refer to
3180            #   https://www.bluetooth.com/specifications/gatt/services
3181            if not self.bluetooth_le_facade.btmon_find('0x%s' % uuid):
3182                service_uuids_found = False
3183                break
3184
3185        # Verify service data.
3186        service_data_found = True
3187        for uuid, data in advertisement_data.get('ServiceData', {}).items():
3188            # A service data looks like
3189            #   Service Data (UUID 0x9999): 0001020304
3190            # while uuid is '9999' and data is [0x00, 0x01, 0x02, 0x03, 0x04]
3191            data_str = ''.join(['%02x' % n for n in data])
3192            if not self.bluetooth_le_facade.btmon_find(
3193                    'Service Data (UUID 0x%s): %s' % (uuid, data_str)):
3194                service_data_found = False
3195                break
3196
3197        # Broadcast advertisements are overwritten in some kernel versions to
3198        # be more aggressive. Verify that the advertising intervals are correct
3199        # if this mode is not used
3200        if advertisement_data.get('Type') != 'broadcast':
3201            min_adv_interval_ms_found, max_adv_interval_ms_found = (
3202                    self._verify_advertising_intervals(min_adv_interval_ms,
3203                                                       max_adv_interval_ms))
3204
3205        else:
3206            min_adv_interval_ms_found = True
3207            max_adv_interval_ms_found = True
3208
3209        scan_rsp_correct = self._verify_scan_response_data(advertisement_data)
3210
3211        # Verify advertising is enabled.
3212        advertising_enabled = self.bluetooth_le_facade.btmon_find(
3213                'Advertising: Enabled (0x01)')
3214
3215        # Verify new APIs were used
3216        new_apis_used = self.bluetooth_le_facade.btmon_find(
3217                'Add Extended Advertising Parameters')
3218
3219        tx_power_correct = self._verify_adv_tx_power(advertisement_data)
3220
3221        self.results = {
3222                'registration_succeeded': registration_succeeded,
3223                'advertisement_added': advertisement_added,
3224                'manufacturer_data_found': manufacturer_data_found,
3225                'service_uuids_found': service_uuids_found,
3226                'service_data_found': service_data_found,
3227                'min_adv_interval_ms_found': min_adv_interval_ms_found,
3228                'max_adv_interval_ms_found': max_adv_interval_ms_found,
3229                'scan_rsp_correct': scan_rsp_correct,
3230                'advertising_enabled': advertising_enabled,
3231                'new_apis_used': new_apis_used,
3232                'tx_power_correct': tx_power_correct,
3233        }
3234        return all(self.results.values())
3235
3236
3237    @test_retry_and_log(False)
3238    def test_fail_to_register_advertisement(self, advertisement_data,
3239                                            min_adv_interval_ms,
3240                                            max_adv_interval_ms):
3241        """Verify that failure is incurred when max advertisements are reached.
3242
3243        This test verifies that a registration failure is incurred when
3244        max advertisements are reached. The error message looks like:
3245
3246            org.bluez.Error.Failed: Maximum advertisements reached
3247
3248        @param advertisement_data: the advertisement to register.
3249        @param min_adv_interval_ms: min_adv_interval in milliseconds.
3250        @param max_adv_interval_ms: max_adv_interval in milliseconds.
3251
3252        @returns: True if the error message is received correctly.
3253                  False otherwise.
3254
3255        """
3256        logging_timespan = self.compute_logging_timespan(max_adv_interval_ms)
3257        self._get_btmon_log(
3258                lambda: self.bluetooth_le_facade.register_advertisement(
3259                        advertisement_data),
3260                logging_timespan=logging_timespan)
3261
3262        # Verify that it failed to register advertisement due to the fact
3263        # that max advertisements are reached.
3264        failed_to_register_error = (self.ERROR_FAILED_TO_REGISTER_ADVERTISEMENT
3265                                    in self.advertising_msg)
3266
3267        # Verify that no new advertisement is added.
3268        advertisement_not_added = not self.bluetooth_le_facade.btmon_find(
3269                'Advertising Added:')
3270
3271        self.results = {
3272                'failed_to_register_error': failed_to_register_error,
3273                'advertisement_not_added': advertisement_not_added,
3274        }
3275
3276        # If the registration fails and extended advertising is available,
3277        # there will be no events in btmon. Therefore, we only run this part of
3278        # the test if extended advertising is not available, indicating that
3279        # software advertisement rotation is being used.
3280        if not self.ext_adv_enabled():
3281            # Verify that the advertising intervals are correct.
3282            min_adv_interval_ms_found, max_adv_interval_ms_found = (
3283                    self._verify_advertising_intervals(min_adv_interval_ms,
3284                                                       max_adv_interval_ms))
3285
3286            # Verify advertising remains enabled.
3287            advertising_enabled = self.bluetooth_le_facade.btmon_find(
3288                    'Advertising: Enabled (0x01)')
3289
3290            self.results.update({
3291                'min_adv_interval_ms_found': min_adv_interval_ms_found,
3292                'max_adv_interval_ms_found': max_adv_interval_ms_found,
3293                'advertising_enabled': advertising_enabled,
3294            })
3295
3296        return all(self.results.values())
3297
3298
3299    @test_retry_and_log(False)
3300    def test_unregister_advertisement(self, advertisement_data, instance_id,
3301                                      advertising_disabled):
3302        """Verify that an advertisement is unregistered correctly.
3303
3304        This test verifies the following data:
3305        - advertisement removed
3306        - advertising status: enabled if there are advertisements left;
3307                              disabled otherwise.
3308
3309        @param advertisement_data: the data of an advertisement to unregister.
3310        @param instance_id: the instance id of the advertisement to remove.
3311        @param advertising_disabled: is advertising disabled? This happens
3312                only when all advertisements are removed.
3313
3314        @returns: True if the advertisement is unregistered correctly.
3315                  False otherwise.
3316
3317        """
3318        self.count_advertisements -= 1
3319        self._get_btmon_log(
3320                lambda: self.bluetooth_le_facade.unregister_advertisement(
3321                        advertisement_data))
3322
3323        # Verify that the advertisement is removed.
3324        advertisement_removed = (
3325                self.bluetooth_le_facade.btmon_find('Advertising Removed') and
3326                self.bluetooth_le_facade.btmon_find('Instance: %d' %
3327                                                    instance_id))
3328
3329        # If advertising_disabled is True, there should be no log like
3330        #       'Advertising: Enabled (0x01)'
3331        # If advertising_disabled is False, there should be log like
3332        #       'Advertising: Enabled (0x01)'
3333
3334        # Only need to check advertising status when the last advertisement
3335        # is removed. For any other advertisements prior to the last one,
3336        # we may or may not observe 'Advertising: Enabled (0x01)' message.
3337        # Hence, the test would become flaky if we insist to see that message.
3338        # A possible workaround is to sleep for a while and then check the
3339        # message. The drawback is that we may need to wait up to 10 seconds
3340        # if the advertising duration and intervals are long.
3341        # In a test case, we always run test_check_duration_and_intervals()
3342        # to check if advertising duration and intervals are correct after
3343        # un-registering one or all advertisements, it is safe to do so.
3344        advertising_enabled_found = self.bluetooth_le_facade.btmon_find(
3345                'Advertising: Enabled (0x01)')
3346        advertising_disabled_found = self.bluetooth_le_facade.btmon_find(
3347                'Advertising: Disabled (0x00)')
3348        advertising_status_correct = not advertising_disabled or (
3349                advertising_disabled_found and not advertising_enabled_found)
3350
3351        self.results = {
3352                'advertisement_removed': advertisement_removed,
3353                'advertising_status_correct': advertising_status_correct,
3354        }
3355        return all(self.results.values())
3356
3357
3358    @test_retry_and_log(False)
3359    def test_set_advertising_intervals(self, min_adv_interval_ms,
3360                                       max_adv_interval_ms):
3361        """Verify that new advertising intervals are set correctly.
3362
3363        Note that setting advertising intervals does not enable/disable
3364        advertising. Hence, there is no need to check the advertising
3365        status.
3366
3367        @param min_adv_interval_ms: the min advertising interval in ms.
3368        @param max_adv_interval_ms: the max advertising interval in ms.
3369
3370        @returns: True if the new advertising intervals are correct.
3371                  False otherwise.
3372
3373        """
3374        self._get_btmon_log(
3375                lambda: self.bluetooth_le_facade.set_advertising_intervals(
3376                        min_adv_interval_ms, max_adv_interval_ms))
3377
3378        # Verify the new advertising intervals.
3379        # With intervals of 200 ms and 200 ms, the log looks like
3380        #   bluetoothd: Set Advertising Intervals: 0x0140, 0x0140
3381        txt = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x'
3382        adv_intervals_found = self.bluetooth_le_facade.btmon_find(
3383                txt % (self.convert_to_adv_jiffies(min_adv_interval_ms),
3384                       self.convert_to_adv_jiffies(max_adv_interval_ms)))
3385
3386        self.results = {'adv_intervals_found': adv_intervals_found}
3387        return all(self.results.values())
3388
3389
3390    @test_retry_and_log(False)
3391    def test_fail_to_set_advertising_intervals(
3392            self, invalid_min_adv_interval_ms, invalid_max_adv_interval_ms,
3393            orig_min_adv_interval_ms, orig_max_adv_interval_ms):
3394        """Verify that setting invalid advertising intervals results in error.
3395
3396        If invalid min/max advertising intervals are given, it would incur
3397        the error: 'org.bluez.Error.InvalidArguments: Invalid arguments'.
3398        Note that valid advertising intervals fall between 20 ms and 10,240 ms.
3399
3400        @param invalid_min_adv_interval_ms: the invalid min advertising interval
3401                in ms.
3402        @param invalid_max_adv_interval_ms: the invalid max advertising interval
3403                in ms.
3404        @param orig_min_adv_interval_ms: the original min advertising interval
3405                in ms.
3406        @param orig_max_adv_interval_ms: the original max advertising interval
3407                in ms.
3408
3409        @returns: True if it fails to set invalid advertising intervals.
3410                  False otherwise.
3411
3412        """
3413        self._get_btmon_log(
3414                lambda: self.bluetooth_le_facade.set_advertising_intervals(
3415                        invalid_min_adv_interval_ms,
3416                        invalid_max_adv_interval_ms))
3417
3418        # Verify that the invalid error is observed in the dbus error callback
3419        # message.
3420        invalid_intervals_error = (self.ERROR_INVALID_ADVERTISING_INTERVALS in
3421                                   self.advertising_msg)
3422
3423        # Verify that the min/max advertising intervals remain the same
3424        # after setting the invalid advertising intervals.
3425        #
3426        # In btmon log, we would see the following message first.
3427        #    bluetoothd: Set Advertising Intervals: 0x0010, 0x0010
3428        # And then, we should check if "Min advertising interval" and
3429        # "Max advertising interval" remain the same.
3430        start_str = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x' % (
3431                self.convert_to_adv_jiffies(invalid_min_adv_interval_ms),
3432                self.convert_to_adv_jiffies(invalid_max_adv_interval_ms))
3433
3434        search_strings = ['Min advertising interval:',
3435                          'Max advertising interval:']
3436        search_str = '|'.join(search_strings)
3437
3438        contents = self.bluetooth_le_facade.btmon_get(search_str=search_str,
3439                                                      start_str=start_str)
3440
3441        # The min/max advertising intervals of all advertisements should remain
3442        # the same as the previous valid ones.
3443        min_max_str = '[Min|Max] advertising interval: (\d*\.\d*) msec'
3444        min_max_pattern = re.compile(min_max_str)
3445        correct_orig_min_adv_interval = True
3446        correct_orig_max_adv_interval = True
3447        for line in contents:
3448            result = min_max_pattern.search(line)
3449            if result:
3450                interval = float(result.group(1))
3451                if 'Min' in line and interval != orig_min_adv_interval_ms:
3452                    correct_orig_min_adv_interval = False
3453                elif 'Max' in line and interval != orig_max_adv_interval_ms:
3454                    correct_orig_max_adv_interval = False
3455
3456        self.results = {
3457                'invalid_intervals_error': invalid_intervals_error,
3458                'correct_orig_min_adv_interval': correct_orig_min_adv_interval,
3459                'correct_orig_max_adv_interval': correct_orig_max_adv_interval}
3460
3461        return all(self.results.values())
3462
3463
3464    @test_retry_and_log(False)
3465    def test_check_advertising_intervals(self, min_adv_interval_ms,
3466                                         max_adv_interval_ms):
3467        """Verify that the advertising intervals are as expected.
3468
3469        @param min_adv_interval_ms: the min advertising interval in ms.
3470        @param max_adv_interval_ms: the max advertising interval in ms.
3471
3472        @returns: True if the advertising intervals are correct.
3473                  False otherwise.
3474
3475        """
3476        self._get_btmon_log(None)
3477
3478        # Verify that the advertising intervals are correct.
3479        min_adv_interval_ms_found, max_adv_interval_ms_found = (
3480                self._verify_advertising_intervals(min_adv_interval_ms,
3481                                                   max_adv_interval_ms))
3482
3483        self.results = {
3484                'min_adv_interval_ms_found': min_adv_interval_ms_found,
3485                'max_adv_interval_ms_found': max_adv_interval_ms_found,
3486        }
3487        return all(self.results.values())
3488
3489
3490    @test_retry_and_log(False)
3491    def test_reset_advertising(self, instance_ids=[]):
3492        """Verify that advertising is reset correctly.
3493
3494        Note that reset advertising would set advertising intervals to
3495        the default values. However, we would not be able to observe
3496        the values change until new advertisements are registered.
3497        Therefore, it is required that a test_register_advertisement()
3498        test is conducted after this test.
3499
3500        If instance_ids is [], all advertisements would still be removed
3501        if there are any. However, no need to check 'Advertising Removed'
3502        in btmon log since we may or may not be able to observe the message.
3503        This feature is needed if this test is invoked as the first one in
3504        a test case to reset advertising. In this situation, this test does
3505        not know how many advertisements exist.
3506
3507        @param instance_ids: the list of instance IDs that should be removed.
3508
3509        @returns: True if advertising is reset correctly.
3510                  False otherwise.
3511
3512        """
3513        self.count_advertisements = 0
3514        self._get_btmon_log(
3515                lambda: self.bluetooth_le_facade.reset_advertising())
3516
3517        # Verify that every advertisement is removed. When an advertisement
3518        # with instance id 1 is removed, the log looks like
3519        #   Advertising Removed
3520        #       instance: 1
3521        if len(instance_ids) > 0:
3522            advertisement_removed = self.bluetooth_le_facade.btmon_find(
3523                    'Advertising Removed')
3524            if advertisement_removed:
3525                for instance_id in instance_ids:
3526                    txt = 'Instance: %d' % instance_id
3527                    if not self.bluetooth_le_facade.btmon_find(txt):
3528                        advertisement_removed = False
3529                        break
3530        else:
3531            advertisement_removed = True
3532
3533        if not advertisement_removed:
3534            logging.error('Failed to remove advertisement')
3535
3536        # Verify the advertising is disabled.
3537        advertising_disabled_observied = self.bluetooth_le_facade.btmon_find(
3538                'Advertising: Disabled')
3539        # If there are no existing advertisements, we may not observe
3540        # 'Advertising: Disabled'.
3541        advertising_disabled = (instance_ids == [] or
3542                                advertising_disabled_observied)
3543
3544        self.results = {
3545                'advertisement_removed': advertisement_removed,
3546                'advertising_disabled': advertising_disabled,
3547        }
3548        return all(self.results.values())
3549
3550
3551    @test_retry_and_log(False)
3552    def test_receive_advertisement(self, address=None, UUID=None, timeout=10):
3553        """Verifies that we receive an advertisement with specific contents
3554
3555        Since test_discover_device only uses the existence of a device dbus path
3556        to indicate when a device is discovered, it is not adequate if we want
3557        to verify that we have received an advertisement from a device. This
3558        test monitors btmon around a discovery instance and searches for the
3559        relevant advertising report.
3560
3561        @param address: String address of peer
3562        @param UUID: String of hex data
3563        @param timeout: seconds to listen for traffic
3564
3565        @returns True if report was located, otherwise False
3566        """
3567
3568        def _discover_devices():
3569            self.test_start_discovery()
3570            time.sleep(timeout)
3571            self.test_stop_discovery()
3572
3573        # Run discovery, record btmon log
3574        self._get_btmon_log(_discover_devices)
3575
3576        # Grab all logs received
3577        btmon_log = '\n'.join(self.bluetooth_le_facade.btmon_get('', ''))
3578
3579        desired_strs = []
3580
3581        if address is not None:
3582            desired_strs.append('Address: {}'.format(address))
3583
3584        if UUID is not None:
3585            desired_strs.append('({})'.format(UUID))
3586
3587        # Split btmon events by HCI and MGMT delimiters
3588        event_delimiter = '|'.join(['@ MGMT', '> HCI', '< HCI'])
3589        btmon_events = re.split(event_delimiter, btmon_log)
3590
3591        features_located = False
3592
3593        for event_str in btmon_events:
3594            if 'Advertising Report' not in event_str:
3595                continue
3596
3597            for desired_str in desired_strs:
3598                if desired_str not in event_str:
3599                    break
3600
3601            else:
3602                features_located = True
3603
3604        self.results = {
3605                'features_located': features_located,
3606        }
3607        return all(self.results.values())
3608
3609
3610    def add_device(self, address, address_type, action):
3611        """Add a device to the Kernel action list."""
3612        return self.bluetooth_facade.add_device(address, address_type, action)
3613
3614
3615    def remove_device(self, address, address_type):
3616        """Remove a device from the Kernel action list."""
3617        return self.bluetooth_facade.remove_device(address,address_type)
3618
3619
3620    def read_supported_commands(self):
3621        """Read the set of supported commands from the Kernel."""
3622        return self.bluetooth_facade.read_supported_commands()
3623
3624
3625    def read_info(self):
3626        """Read the adapter information from the Kernel."""
3627        return self.bluetooth_facade.read_info()
3628
3629
3630    def get_adapter_properties(self):
3631        """Read the adapter properties from the Bluetooth Daemon."""
3632        return self.bluetooth_facade.get_adapter_properties()
3633
3634
3635    def get_dev_info(self):
3636        """Read raw HCI device information."""
3637        return self.bluetooth_facade.get_dev_info()
3638
3639    def log_settings(self, msg, settings):
3640        """function convert MGMT_OP_READ_INFO settings to string
3641
3642        @param msg: string to include in output
3643        @param settings: bitstring returned by MGMT_OP_READ_INFO
3644        @return : List of strings indicating different settings
3645        """
3646        strs = []
3647        if settings & bluetooth_socket.MGMT_SETTING_POWERED:
3648            strs.append("POWERED")
3649        if settings & bluetooth_socket.MGMT_SETTING_CONNECTABLE:
3650            strs.append("CONNECTABLE")
3651        if settings & bluetooth_socket.MGMT_SETTING_FAST_CONNECTABLE:
3652            strs.append("FAST-CONNECTABLE")
3653        if settings & bluetooth_socket.MGMT_SETTING_DISCOVERABLE:
3654            strs.append("DISCOVERABLE")
3655        if settings & bluetooth_socket.MGMT_SETTING_PAIRABLE:
3656            strs.append("PAIRABLE")
3657        if settings & bluetooth_socket.MGMT_SETTING_LINK_SECURITY:
3658            strs.append("LINK-SECURITY")
3659        if settings & bluetooth_socket.MGMT_SETTING_SSP:
3660            strs.append("SSP")
3661        if settings & bluetooth_socket.MGMT_SETTING_BREDR:
3662            strs.append("BR/EDR")
3663        if settings & bluetooth_socket.MGMT_SETTING_HS:
3664            strs.append("HS")
3665        if settings & bluetooth_socket.MGMT_SETTING_LE:
3666            strs.append("LE")
3667        logging.debug('%s : %s', msg, " ".join(strs))
3668        return strs
3669
3670    def log_flags(self, msg, flags):
3671        """Function to convert HCI state configuration to a string
3672
3673        @param msg: string to include in output
3674        @param settings: bitstring returned by get_dev_info
3675        @return : List of strings indicating different flags
3676        """
3677        strs = []
3678        if flags & bluetooth_socket.HCI_UP:
3679            strs.append("UP")
3680        else:
3681            strs.append("DOWN")
3682        if flags & bluetooth_socket.HCI_INIT:
3683            strs.append("INIT")
3684        if flags & bluetooth_socket.HCI_RUNNING:
3685            strs.append("RUNNING")
3686        if flags & bluetooth_socket.HCI_PSCAN:
3687            strs.append("PSCAN")
3688        if flags & bluetooth_socket.HCI_ISCAN:
3689            strs.append("ISCAN")
3690        if flags & bluetooth_socket.HCI_AUTH:
3691            strs.append("AUTH")
3692        if flags & bluetooth_socket.HCI_ENCRYPT:
3693            strs.append("ENCRYPT")
3694        if flags & bluetooth_socket.HCI_INQUIRY:
3695            strs.append("INQUIRY")
3696        if flags & bluetooth_socket.HCI_RAW:
3697            strs.append("RAW")
3698        logging.debug('%s [HCI]: %s', msg, " ".join(strs))
3699        return strs
3700
3701
3702    @test_retry_and_log(False)
3703    def test_service_resolved(self, address):
3704        """Test that the services under device address can be resolved
3705
3706        @param address: MAC address of a device
3707
3708        @returns: True if the ServicesResolved property is changed before
3709                 timeout, False otherwise.
3710
3711        """
3712        is_resolved_func = self.bluetooth_facade.device_services_resolved
3713        return self._wait_for_condition(lambda : is_resolved_func(address),\
3714                                        method_name())
3715
3716
3717    @test_retry_and_log(False)
3718    def test_gatt_browse(self, address):
3719        """Test that the GATT client can get the attributes correctly
3720
3721        @param address: MAC address of a device
3722
3723        @returns: True if the attribute map received by GATT client is the same
3724                  as expected. False otherwise.
3725
3726        """
3727
3728        gatt_client_facade = GATT_ClientFacade(self.bluetooth_facade)
3729        actual_app = gatt_client_facade.browse(address)
3730        expected_app = GATT_HIDApplication()
3731        diff = GATT_Application.diff(actual_app, expected_app)
3732
3733        self.result = {
3734            'actural_result': actual_app,
3735            'expected_result': expected_app
3736        }
3737
3738        gatt_attribute_hierarchy = ['Device', 'Service', 'Characteristic',
3739                                    'Descriptor']
3740        # Remove any difference in object path
3741        for parent, child in zip(gatt_attribute_hierarchy,
3742                                 gatt_attribute_hierarchy[1:]):
3743            pattern = re.compile('^%s .* is different in %s' % (child, parent))
3744            for diff_str in diff[::]:
3745                if pattern.search(diff_str):
3746                    diff.remove(diff_str)
3747
3748        if len(diff) != 0:
3749            logging.error('Application Diff: %s', diff)
3750            return False
3751        return True
3752
3753
3754    def _record_input_events(self, device, gesture, address=None):
3755        """Record the input events.
3756
3757        @param device: the bluetooth HID device.
3758        @param gesture: the gesture method to perform.
3759
3760        @returns: the input events received on the DUT.
3761
3762        """
3763        self.input_facade.initialize_input_recorder(device.name, uniq=address)
3764        self.input_facade.start_input_recorder(device.name)
3765        time.sleep(self.HID_REPORT_SLEEP_SECS)
3766        gesture()
3767        time.sleep(self.HID_REPORT_SLEEP_SECS)
3768        self.input_facade.stop_input_recorder(device.name)
3769        time.sleep(self.HID_REPORT_SLEEP_SECS)
3770        event_values = self.input_facade.get_input_events(device.name)
3771        events = [Event(*ev) for ev in event_values]
3772        return events
3773
3774
3775    # -------------------------------------------------------------------
3776    # Bluetooth mouse related tests
3777    # -------------------------------------------------------------------
3778
3779
3780    def _test_mouse_click(self, device, button):
3781        """Test that the mouse click events could be received correctly.
3782
3783        @param device: the meta device containing a bluetooth HID device
3784        @param button: which button to test, 'LEFT' or 'RIGHT'
3785
3786        @returns: True if the report received by the host matches the
3787                  expected one. False otherwise.
3788
3789        """
3790        if button == 'LEFT':
3791            gesture = device.LeftClick
3792        elif button == 'RIGHT':
3793            gesture = device.RightClick
3794        else:
3795            raise error.TestError('Button (%s) is not valid.' % button)
3796
3797        actual_events = self._record_input_events(device,
3798                                                  gesture,
3799                                                  address=device.address)
3800
3801        linux_input_button = {'LEFT': BTN_LEFT, 'RIGHT': BTN_RIGHT}
3802        expected_events = [
3803                # Button down
3804                recorder.MSC_SCAN_BTN_EVENT[button],
3805                Event(EV_KEY, linux_input_button[button], 1),
3806                recorder.SYN_EVENT,
3807                # Button up
3808                recorder.MSC_SCAN_BTN_EVENT[button],
3809                Event(EV_KEY, linux_input_button[button], 0),
3810                recorder.SYN_EVENT]
3811
3812        self.results = {
3813                'actual_events': list(map(str, actual_events)),
3814                'expected_events': list(map(str, expected_events))}
3815        return actual_events == expected_events
3816
3817
3818    @test_retry_and_log
3819    def test_mouse_left_click(self, device):
3820        """Test that the mouse left click events could be received correctly.
3821
3822        @param device: the meta device containing a bluetooth HID device
3823
3824        @returns: True if the report received by the host matches the
3825                  expected one. False otherwise.
3826
3827        """
3828        return self._test_mouse_click(device, 'LEFT')
3829
3830
3831    @test_retry_and_log
3832    def test_mouse_right_click(self, device):
3833        """Test that the mouse right click events could be received correctly.
3834
3835        @param device: the meta device containing a bluetooth HID device
3836
3837        @returns: True if the report received by the host matches the
3838                  expected one. False otherwise.
3839
3840        """
3841        return self._test_mouse_click(device, 'RIGHT')
3842
3843
3844    def _test_mouse_move(self, device, delta_x=0, delta_y=0):
3845        """Test that the mouse move events could be received correctly.
3846
3847        @param device: the meta device containing a bluetooth HID device
3848        @param delta_x: the distance to move cursor in x axis
3849        @param delta_y: the distance to move cursor in y axis
3850
3851        @returns: True if the report received by the host matches the
3852                  expected one. False otherwise.
3853
3854        """
3855        gesture = lambda: device.Move(delta_x, delta_y)
3856        actual_events = self._record_input_events(device,
3857                                                  gesture,
3858                                                  address=device.address)
3859
3860        events_x = [Event(EV_REL, REL_X, delta_x)] if delta_x else []
3861        events_y = [Event(EV_REL, REL_Y, delta_y)] if delta_y else []
3862        expected_events = events_x + events_y + [recorder.SYN_EVENT]
3863
3864        self.results = {
3865                'actual_events': list(map(str, actual_events)),
3866                'expected_events': list(map(str, expected_events))}
3867        return actual_events == expected_events
3868
3869
3870    @test_retry_and_log
3871    def test_mouse_move_in_x(self, device, delta_x):
3872        """Test that the mouse move events in x could be received correctly.
3873
3874        @param device: the meta device containing a bluetooth HID device
3875        @param delta_x: the distance to move cursor in x axis
3876
3877        @returns: True if the report received by the host matches the
3878                  expected one. False otherwise.
3879
3880        """
3881        return self._test_mouse_move(device, delta_x=delta_x)
3882
3883
3884    @test_retry_and_log
3885    def test_mouse_move_in_y(self, device, delta_y):
3886        """Test that the mouse move events in y could be received correctly.
3887
3888        @param device: the meta device containing a bluetooth HID device
3889        @param delta_y: the distance to move cursor in y axis
3890
3891        @returns: True if the report received by the host matches the
3892                  expected one. False otherwise.
3893
3894        """
3895        return self._test_mouse_move(device, delta_y=delta_y)
3896
3897
3898    @test_retry_and_log
3899    def test_mouse_move_in_xy(self, device, delta_x=-60, delta_y=100):
3900        """Test that the mouse move events could be received correctly.
3901
3902        @param device: the meta device containing a bluetooth HID device
3903        @param delta_x: the distance to move cursor in x axis
3904        @param delta_y: the distance to move cursor in y axis
3905
3906        @returns: True if the report received by the host matches the
3907                  expected one. False otherwise.
3908
3909        """
3910        return self._test_mouse_move(device, delta_x=delta_x, delta_y=delta_y)
3911
3912
3913    def _test_mouse_scroll(self, device, units):
3914        """Test that the mouse wheel events could be received correctly.
3915
3916        @param device: the meta device containing a bluetooth HID device
3917        @param units: the units to scroll in y axis
3918
3919        @returns: True if the report received by the host matches the
3920                  expected one. False otherwise.
3921
3922        """
3923        gesture = lambda: device.Scroll(units)
3924        recorded_events = self._record_input_events(device,
3925                                                    gesture,
3926                                                    address=device.address)
3927
3928        # Since high-speed scrolling events are inserted after they are passed
3929        # through bluetooth module, we ignore these events since they are
3930        # irrelevant for us
3931        scroll_events = [ev for ev in recorded_events
3932                            if ev.code != REL_WHEEL_HI_RES]
3933
3934        expected_events = [Event(EV_REL, REL_WHEEL, units), recorder.SYN_EVENT]
3935        self.results = {
3936                'scroll_events': list(map(str, scroll_events)),
3937                'expected_events': list(map(str, expected_events))}
3938        return scroll_events == expected_events
3939
3940
3941    @test_retry_and_log
3942    def test_mouse_scroll_down(self, device, delta_y):
3943        """Test that the mouse wheel events could be received correctly.
3944
3945        @param device: the meta device containing a bluetooth HID device
3946        @param delta_y: the units to scroll down in y axis;
3947                        should be a postive value
3948
3949        @returns: True if the report received by the host matches the
3950                  expected one. False otherwise.
3951
3952        """
3953        if delta_y > 0:
3954            return self._test_mouse_scroll(device, delta_y)
3955        else:
3956            raise error.TestError('delta_y (%d) should be a positive value',
3957                                  delta_y)
3958
3959
3960    @test_retry_and_log
3961    def test_mouse_scroll_up(self, device, delta_y):
3962        """Test that the mouse wheel events could be received correctly.
3963
3964        @param device: the meta device containing a bluetooth HID device
3965        @param delta_y: the units to scroll up in y axis;
3966                        should be a postive value
3967
3968        @returns: True if the report received by the host matches the
3969                  expected one. False otherwise.
3970
3971        """
3972        if delta_y > 0:
3973            return self._test_mouse_scroll(device, -delta_y)
3974        else:
3975            raise error.TestError('delta_y (%d) should be a positive value',
3976                                  delta_y)
3977
3978
3979    @test_retry_and_log
3980    def test_mouse_click_and_drag(self, device, delta_x, delta_y):
3981        """Test that the mouse click-and-drag events could be received
3982        correctly.
3983
3984        @param device: the meta device containing a bluetooth HID device
3985        @param delta_x: the distance to drag in x axis
3986        @param delta_y: the distance to drag in y axis
3987
3988        @returns: True if the report received by the host matches the
3989                  expected one. False otherwise.
3990
3991        """
3992        gesture = lambda: device.ClickAndDrag(delta_x, delta_y)
3993        actual_events = self._record_input_events(device,
3994                                                  gesture,
3995                                                  address=device.address)
3996
3997        button = 'LEFT'
3998        expected_events = (
3999                [# Button down
4000                 recorder.MSC_SCAN_BTN_EVENT[button],
4001                 Event(EV_KEY, BTN_LEFT, 1),
4002                 recorder.SYN_EVENT] +
4003                # cursor movement in x and y
4004                ([Event(EV_REL, REL_X, delta_x)] if delta_x else []) +
4005                ([Event(EV_REL, REL_Y, delta_y)] if delta_y else []) +
4006                [recorder.SYN_EVENT] +
4007                # Button up
4008                [recorder.MSC_SCAN_BTN_EVENT[button],
4009                 Event(EV_KEY, BTN_LEFT, 0),
4010                 recorder.SYN_EVENT])
4011
4012        self.results = {
4013                'actual_events': list(map(str, actual_events)),
4014                'expected_events': list(map(str, expected_events))}
4015        return actual_events == expected_events
4016
4017
4018    # -------------------------------------------------------------------
4019    # Bluetooth keyboard related tests
4020    # -------------------------------------------------------------------
4021
4022    @test_retry_and_log
4023    def test_keyboard_input_from_trace(self, device, trace_name):
4024        """ Tests that keyboard events can be transmitted and received correctly
4025
4026        @param device: the meta device containing a bluetooth HID device
4027        @param trace_name: string name for keyboard activity trace to be used
4028                           in the test i.e. "simple_text"
4029
4030        @returns: true if the recorded output matches the expected output
4031                  false otherwise
4032        """
4033        length_correct = True
4034        content_correct = True
4035
4036        # Read data from trace I/O files
4037        input_trace = bluetooth_test_utils.parse_trace_file(os.path.join(
4038                      TRACE_LOCATION, '{}_input.txt'.format(trace_name)))
4039        output_trace = bluetooth_test_utils.parse_trace_file(os.path.join(
4040                      TRACE_LOCATION, '{}_output.txt'.format(trace_name)))
4041
4042        if not input_trace or not output_trace:
4043            logging.error('Failure in using trace')
4044            return False
4045
4046        # Disregard timing data for now
4047        input_scan_codes = [tup[1] for tup in input_trace]
4048        predicted_events = [Event(*tup[1]) for tup in output_trace]
4049
4050        # Create and run this trace as a gesture
4051        gesture = lambda: device.KeyboardSendTrace(input_scan_codes)
4052        rec_events = self._record_input_events(device,
4053                                               gesture,
4054                                               address=device.address)
4055
4056        # Filter out any input events that were not from the keyboard
4057        rec_key_events = [ev for ev in rec_events if ev.type == EV_KEY]
4058
4059        # Fail if we didn't record the correct number of events
4060        if len(rec_key_events) != len(input_scan_codes):
4061            logging.info('Expected {} events, received {}'.format(
4062                    len(input_scan_codes), len(rec_key_events)))
4063            length_correct = False
4064
4065        for idx, predicted in enumerate(predicted_events):
4066            recorded = rec_key_events[idx]
4067
4068            if not predicted == recorded:
4069                content_correct = False
4070                break
4071
4072        self.results = {
4073            'received_events': len(rec_key_events) > 0,
4074            'length_correct': length_correct,
4075            'content_correct': content_correct,
4076        }
4077
4078        return all(self.results)
4079
4080
4081    def is_newer_kernel_version(self, version, minimum_version):
4082        """ Check if given kernel version is newer than unsupported version."""
4083
4084        return utils.compare_versions(version, minimum_version) >= 0
4085
4086
4087    def is_supported_kernel_version(self, kernel_version, minimum_version,
4088                                    msg=None):
4089        """ Check if kernel version is greater than minimum version.
4090
4091            Check if given kernel version is greater than or equal to minimum
4092            version. Raise TEST_NA if given kernel version is lower than the
4093            minimum version.
4094
4095            Note: Kernel version may have suffixes, so ensure that minimum
4096            version should be the smallest version that is permissible.
4097            Ex: If minimum version is 3.8.11 then 3.8.11-<random> will
4098            pass the check.
4099
4100            @param kernel_version: kernel version to be checked as a string
4101            @param: minimum_version: minimum kernel version requried
4102
4103            @returns: None
4104
4105            @raises: TEST_NA if kernel version is not greater than the minimum
4106                     version
4107        """
4108
4109        logging.debug('kernel version is {} minimum version'
4110                      'is {}'.format(kernel_version,minimum_version))
4111
4112        if msg is None:
4113            msg = 'Test not supported on this kernel version'
4114
4115        if not self.is_newer_kernel_version(kernel_version, minimum_version):
4116            logging.info('Kernel version check failed: %s', msg)
4117            raise error.TestNAError(msg)
4118
4119        logging.debug('Kernel version check passed')
4120
4121
4122    # -------------------------------------------------------------------
4123    # Bluetooth AVRCP related test
4124    # -------------------------------------------------------------------
4125
4126
4127    @test_retry_and_log
4128    def test_avrcp_event(self, device, generator, avrcp_event):
4129        """Tests that AVRCP events can be transmitted and received correctly
4130
4131        @param device: the meta device containing a Bluetooth AVRCP capable
4132                       audio device.
4133        @param generator: the peer device generator/function which trigger
4134                          the AVRCP event.
4135        @param avrcp_event: the AVRCP event to test.
4136
4137        @returns: true if the recorded output matches the expected output
4138                  false otherwise
4139        """
4140        logging.debug('AVRCP Event Test, Event: %s', avrcp_event)
4141        linux_input_button = {'play': KEY_PLAYCD, 'pause': KEY_PAUSECD,
4142                              'stop': KEY_STOPCD, 'next': KEY_NEXTSONG,
4143                              'previous': KEY_PREVIOUSSONG}
4144        expected_event = [
4145                # Button down
4146                Event(EV_KEY, linux_input_button[avrcp_event], 1),
4147                recorder.SYN_EVENT,
4148                # Button up
4149                Event(EV_KEY, linux_input_button[avrcp_event], 0),
4150                recorder.SYN_EVENT]
4151
4152        gesture = lambda: generator(avrcp_event)
4153        actual_event = self._record_input_events(device, gesture)
4154
4155        return actual_event == expected_event
4156
4157
4158    # -------------------------------------------------------------------
4159    # Enterprise policy tests
4160    # -------------------------------------------------------------------
4161
4162    def _test_check_set_allowlist(self, uuids, expected_result):
4163        """The test to set valid and invalid allowlists test.
4164
4165        @param uuids: the uuids in the allowlist to set.
4166        @param expected_result: True if the test is expected to pass.
4167        """
4168        create_uuid = bluetooth_test_utils.Bluetooth_UUID.create_valid_uuid
4169        exp_res_str = 'valid' if expected_result else 'invalid'
4170        logging.info('%s uuids: "%s"', exp_res_str, uuids)
4171
4172        result, err_msg = self.bluetooth_facade.policy_set_service_allow_list(
4173                uuids)
4174        logging.debug('result %s (%s)', result, err_msg)
4175
4176        if expected_result:
4177            check_set_allowlist = result
4178        else:
4179            check_set_allowlist = ('org.bluez.Error.InvalidArguments' in err_msg
4180                                   and not result)
4181
4182        # Query bluez to read the allow list.
4183        actual_uuids_list = [
4184                create_uuid(uuid) for uuid in
4185                self.bluetooth_facade.policy_get_service_allow_list()]
4186        actual_uuids_list.sort()
4187
4188        # Convert the original UUIDs into a list of full-length UUIDs and
4189        # remove duplicate UUIDs in order to compare the original UUIDs
4190        # with the actual UUIDs set by bluez.
4191        orig_uuids_list = []
4192        if expected_result and uuids != '':
4193            for uuid in uuids.split(','):
4194                u = create_uuid(uuid)
4195                if u is None:
4196                    raise error.TestFail('uuid %s in uuids %s is not valid' %
4197                                         (uuid, uuids))
4198                orig_uuids_list.append(u)
4199        orig_dedup_uuids = list(set(orig_uuids_list))
4200        orig_dedup_uuids.sort()
4201        uuids_comp_result = actual_uuids_list == orig_dedup_uuids
4202
4203        self.results = {'uuids': uuids,
4204                        'expected_set_allowlist_result': expected_result,
4205                        'actual_set_allowlist_result': result,
4206                        'orig_dedup_uuids': orig_dedup_uuids,
4207                        'actual_uuids_list': actual_uuids_list,
4208                        'check_set_allowlist': check_set_allowlist,
4209                        'uuids_comp_result': uuids_comp_result}
4210        logging.debug('actual_uuids_list %s', actual_uuids_list)
4211        logging.debug('orig_uuids_list %s', orig_uuids_list)
4212
4213        return (check_set_allowlist and uuids_comp_result)
4214
4215
4216    @test_retry_and_log(False)
4217    def test_check_set_allowlist(self, uuids, expected_result):
4218        """The test to set valid and invalid allowlists test.
4219
4220        @param uuids: the uuids in the allowlist to set.
4221        @param expected_result: True if the test is expected to pass.
4222        """
4223        return self._test_check_set_allowlist(uuids, expected_result)
4224
4225
4226    @test_retry_and_log(False)
4227    def test_reset_allowlist(self):
4228        """The test to reset the allowlists.
4229
4230        The test is used to clean up the allowlist.
4231        """
4232        return self._test_check_set_allowlist('', True)
4233
4234
4235    def policy_is_affected(self, device):
4236        """Check if the device is affected by policy.
4237
4238        @param device: the connected device.
4239
4240        @returns: True if the device is affected by the enterprise policy.
4241                  False if not. None if the device is not found.
4242        """
4243        return self.bluetooth_facade.policy_get_device_affected(device.address)
4244
4245
4246    @test_retry_and_log(False)
4247    def test_affected_by_policy(self, device):
4248        """A test that the device is affected by policy
4249
4250        @param device: the peripheral device
4251        @returns: True if the device is affected; False otherwise.
4252        """
4253        result = self.policy_is_affected(device)
4254        logging.debug('policy_is_affected(%s): %s', device.address, result)
4255        self.results = {
4256                'expected_result': 'True (affected)',
4257                'actual_result': result
4258        }
4259        return result is True
4260
4261
4262    @test_retry_and_log(False)
4263    def test_not_affected_by_policy(self, device):
4264        """A test that the device is not affected by policy
4265
4266        @param device: the peripheral device
4267        @returns: True if the device is not affected; False otherwise.
4268        """
4269        result = self.policy_is_affected(device)
4270        logging.debug('policy_is_affected(%s): %s', device.address, result)
4271        self.results = {
4272                'expected_result': 'False (not affected)',
4273                'actual_result': result
4274        }
4275        return result is False
4276
4277    def check_if_affected_by_policy(self, device, expected_result):
4278        """A test that the device policy is enforced correctly
4279
4280        @param device: the peripheral device
4281        @param expected_result: True if the test is expected to pass.
4282
4283        @returns: True if the device is affected or not affected per
4284                  expected_result; False otherwise.
4285        """
4286        if expected_result:
4287            return self.test_affected_by_policy(device)
4288        else:
4289            return self.test_not_affected_by_policy(device)
4290
4291
4292    # -------------------------------------------------------------------
4293    # Servod related tests
4294    # -------------------------------------------------------------------
4295
4296    @test_retry_and_log
4297    def test_power_consumption(self, device, max_power_mw):
4298        """Test the average power consumption."""
4299        power_mw = device.servod.MeasurePowerConsumption()
4300        self.results = {'power_mw': power_mw}
4301
4302        if (power_mw is None):
4303            logging.error('Failed to measure power consumption')
4304            return False
4305
4306        power_mw = float(power_mw)
4307        logging.info('power consumption (mw): %f (max allowed: %f)',
4308                     power_mw, max_power_mw)
4309
4310        return power_mw <= max_power_mw
4311
4312
4313    @test_retry_and_log
4314    def test_start_notify(self, object_path, cccd_value):
4315        """Test that a notification can be started on a characteristic
4316
4317        @param object_path: the object path of the characteristic.
4318        @param cccd_value: Possible CCCD values include
4319               0x00 - inferred from the remote characteristic's properties
4320               0x01 - notification
4321               0x02 - indication
4322
4323        @returns: The test results.
4324
4325        """
4326        if object_path is None:
4327            logging.error('Invalid object path')
4328            return False
4329
4330        start_notify = self.bluetooth_facade.start_notify(
4331            object_path, cccd_value)
4332        is_notifying = self._wait_for_condition(
4333            lambda: self.bluetooth_facade.is_notifying(
4334                object_path), method_name())
4335
4336        self.results = {
4337            'start_notify': start_notify,
4338            'is_notifying': is_notifying}
4339
4340        return all(self.results.values())
4341
4342
4343    @test_retry_and_log
4344    def test_stop_notify(self, object_path):
4345        """Test that a notification can be stopped on a characteristic
4346
4347        @param object_path: the object path of the characteristic.
4348
4349        @returns: The test results.
4350
4351        """
4352        if object_path is None:
4353            logging.error('Invalid object path')
4354            return False
4355
4356        stop_notify = self.bluetooth_facade.stop_notify(object_path)
4357        is_not_notifying = self._wait_for_condition(
4358            lambda: not self.bluetooth_facade.is_notifying(
4359                object_path), method_name())
4360
4361        self.results = {
4362            'stop_notify': stop_notify,
4363            'is_not_notifying': is_not_notifying}
4364
4365        return all(self.results.values())
4366
4367
4368    @test_retry_and_log(False)
4369    def test_set_discovery_filter(self, filter):
4370        """Test set discovery filter"""
4371
4372        return self.bluetooth_facade.set_discovery_filter(filter)
4373
4374
4375    @test_retry_and_log(False)
4376    def test_set_le_connection_parameters(self, address, parameters):
4377        """Test set LE connection parameters"""
4378
4379        return self.bluetooth_facade.set_le_connection_parameters(
4380            address, parameters)
4381
4382
4383    @test_retry_and_log(False)
4384    def test_get_connection_info(self, address):
4385        """Test that connection info to device is retrievable."""
4386
4387        return (self.bluetooth_facade.get_connection_info(address)
4388                is not None)
4389
4390
4391    @test_retry_and_log(False, messages_stop=False)
4392    def test_suspend_and_wait_for_sleep(self, suspend, sleep_timeout):
4393        """ Suspend the device and wait until it is sleeping.
4394
4395        @param suspend: Sub-process that does the actual suspend call.
4396        @param sleep_timeout time limit in seconds to allow the host sleep.
4397
4398        @return True if host is asleep within a short timeout, False otherwise.
4399        """
4400        suspend.start()
4401        try:
4402            self.host.test_wait_for_sleep(sleep_timeout)
4403        except Exception as e:
4404            suspend.join()
4405            self.results = {'exception': str(e)}
4406            return False
4407
4408        return True
4409
4410
4411    @test_retry_and_log(False, messages_start=False)
4412    def test_wait_for_resume(self,
4413                             boot_id,
4414                             suspend,
4415                             resume_timeout,
4416                             test_start_time,
4417                             resume_slack=RESUME_DELTA,
4418                             fail_on_timeout=False,
4419                             fail_early_wake=True,
4420                             collect_resume_time=False):
4421        """ Wait for device to resume from suspend.
4422
4423        @param boot_id: Current boot id
4424        @param suspend: Sub-process that does actual suspend call.
4425        @param resume_timeout: Expect device to resume in given timeout.
4426        @param test_start_time: When was this test started? (device time)
4427        @param resume_slack: Allow some slack on resume timeout.
4428        @param fail_on_timeout: Fails if timeout is reached
4429        @param fail_early_wake: Fails if timeout isn't reached
4430        @param collect_resume_time: Collect time to resume as perf keyval.
4431
4432        @return True if suspend sub-process completed without error.
4433        """
4434        success = False
4435        results = {}
4436
4437        def _check_timeout(delta):
4438            if delta > timedelta(seconds=resume_timeout):
4439                return not fail_on_timeout
4440            else:
4441                return not fail_early_wake
4442
4443        def _check_suspend_attempt_or_raise(test_start, wake_at):
4444            """Make sure suspend attempt was recent or raise TestNA.
4445
4446            If we're looking at a previous suspend attempt, it means the test
4447            didn't trigger a suspend properly (i.e. no powerd call)
4448
4449            @param test_start: When we started the test.
4450            @param wake_at: When powerd suspend resumed.
4451
4452            @raises: error.TestNAError if found suspend occurred before we
4453                     started the test.
4454            """
4455            # If the last suspend attempt was before we started the test, it's
4456            # probably not a recent attempt.
4457            if wake_at < test_start:
4458                raise error.TestNAError(
4459                        'No recent suspend attempt found. '
4460                        'Started test at {} but last suspend ended at {}'.
4461                        format(test_start, wake_at))
4462
4463            # If the last suspend attempt recorded time is some time in the
4464            # future, probably a time conversion error occurred.
4465            current_time = self.bluetooth_facade.get_device_utc_time()
4466            if current_time < wake_at:
4467                raise error.TestFail(
4468                        'Timezone conversion error found. '
4469                        'Last suspend ended at {} but current time is {}'.
4470                        format(wake_at, current_time))
4471
4472            return True
4473
4474        def _check_retcode_or_raise(retcode):
4475            """Make sure powerd return was successful.
4476
4477            @param retcode: Return code of powerd_suspend.
4478
4479            @raises: error.TestNAError if failed suspend due to non-BT
4480            @return: False if BT woke us, True otherwise
4481            """
4482            if retcode:
4483                if self.bluetooth_facade.bt_caused_last_resume():
4484                    return False
4485                else:
4486                    raise error.TestNAError(
4487                            'Failed suspend due to non-BT wake')
4488
4489            return True
4490
4491        # Sometimes it takes longer to resume from suspend; give some leeway
4492        resume_timeout = resume_timeout + resume_slack
4493        results['resume timeout'] = resume_timeout
4494        try:
4495            start = datetime.now()
4496
4497            # Wait for resume needs to wait longer in case device rebooted.
4498            # Otherwise, the test will fail with errno 111 (connection refused)
4499            self.host.test_wait_for_resume(
4500                boot_id, resume_timeout=self.RESUME_INTERNAL_TIMEOUT_SECS)
4501
4502            results['device accessible on resume'] = True
4503
4504            # As of now, a timeout in test_wait_for_resume doesn't raise. Start
4505            # by first measuring the delta until network is back up to the dut.
4506            network_delta = datetime.now() - start
4507
4508            # Use powerd logs to see how much time we actually spent in suspend
4509            # If the network went down during suspend, we will have spent less
4510            # time in suspend than expected. If we can't find info via powerd,
4511            # we can use measured time instead.
4512            info = self.bluetooth_facade.find_last_suspend_via_powerd_logs()
4513            if info:
4514                start_suspend_at, end_suspend_at, retcode = info
4515                logging.debug('find_last_suspend_via_powerd_logs returned: '
4516                              'start_suspend_at: {}, end_suspend_at: {}, '
4517                              'retcode {}'.format(start_suspend_at,
4518                                                  end_suspend_at, retcode))
4519                actual_delta = end_suspend_at - start_suspend_at
4520                results['powerd time to resume'] = actual_delta.total_seconds()
4521                results['powerd retcode'] = retcode
4522
4523                # Resume is successful if suspend occurred correctly and woke up
4524                # within the timeout. One significant caveat is that we only
4525                # fail here if BT blocked suspend, not if we woke spuriously.
4526                # This is by design (we depend on the timeout to check for
4527                # spurious wakeup).
4528                try:
4529                    suspend_ok, retcode_ok, timeout_ok = False, False, False
4530                    suspend_ok = _check_suspend_attempt_or_raise(
4531                            test_start_time, end_suspend_at)
4532                    retcode_ok = _check_retcode_or_raise(retcode)
4533                    timeout_ok = _check_timeout(actual_delta)
4534                except error.TestNAError as e:
4535                    raise e
4536                finally:
4537                    logging.debug('_check_suspend_attempt_or_raise: {} '
4538                                  '_check_retcode_or_raise: {} '
4539                                  '_check_timeout: {}'.format(
4540                                          suspend_ok, retcode_ok, timeout_ok))
4541                success = suspend_ok and retcode_ok and timeout_ok
4542            else:
4543                results['time to resume'] = network_delta.total_seconds()
4544                logging.debug(
4545                        'Unable to get time to resume from powerd. Estimate sleep time '
4546                        'using network ping')
4547                success = _check_timeout(network_delta)
4548        except error.TestFail as e:
4549            results['device accessible on resume'] = False
4550            success = False
4551            logging.error('wait_for_resume: %s', e)
4552
4553            # If the resume failed due to a reboot, raise the testFail and exit
4554            # early from the test
4555            if 'client rebooted' in str(e):
4556                raise
4557        finally:
4558            suspend.join()
4559
4560        # Log wake performance
4561        if collect_resume_time:
4562            test_desc = '{}_wake_time'.format(self.test_name.replace(' ', '_'))
4563            wake_time = results.get('powerd time to resume',
4564                                    results.get('time to resume', 0))
4565            # Only write perf if wake time exists (non-zero)
4566            if wake_time:
4567                self.write_perf_keyval({test_desc: wake_time})
4568
4569        results['success'] = success
4570        results['suspend exit code'] = suspend.exitcode
4571        self.results = results
4572
4573        logging.info('test_wait_for_resume(): %r', results)
4574        return all([success, suspend.exitcode == 0])
4575
4576
4577    def suspend_async(self, suspend_time, expect_bt_wake=False):
4578        """ Suspend asynchronously and return process for joining
4579
4580        @param suspend_time: how long to stay in suspend
4581        @param expect_bt_wake: Whether we expect bluetooth to wake us from
4582            suspend. If true, we expect this resume will occur early
4583
4584        @returns multiprocessing.Process object with suspend task
4585        """
4586
4587        def _action_suspend():
4588            try:
4589                self.bluetooth_facade.do_suspend(suspend_time, expect_bt_wake)
4590            except socket.error as e:
4591                # Socket errors may occur after suspend if the underlying
4592                # connection is lost during suspend (happens if usb-ethernet
4593                # disconnects and reconnects on resume). Catch all these errors
4594                # and swallow them.
4595                logging.warning(
4596                        'Socket error on suspend. Swallowing error: %s',
4597                        str(e))
4598            return 0
4599
4600        proc = multiprocessing.Process(target=_action_suspend)
4601        proc.daemon = True
4602        return proc
4603
4604
4605    def device_connect_async(self,
4606                             device_type,
4607                             device,
4608                             adapter_address,
4609                             delay_wake=1,
4610                             should_wake=True):
4611        """ Connects peer device asynchronously with DUT.
4612
4613        This function uses a thread instead of a subprocess so that the test
4614        result is stored for the test. Otherwise, the test connection was
4615        sometimes failing but the test itself was passing.
4616
4617        @param device_type: The device type (used to check if it's LE)
4618        @param device: the meta device with the peer device
4619        @param adapter_address: the address of the adapter
4620        @param delay_wake: delay wakeup by this many seconds
4621        @param should_wake: Should this cause a wakeup?
4622
4623        @returns threading.Thread object with device connect task
4624        """
4625
4626        def _action_device_connect():
4627            time.sleep(delay_wake)
4628            if 'BLE' in device_type:
4629                # LE reconnects by advertising (dut controller will create LE
4630                # connection, not the peer device)
4631                self.test_device_set_discoverable(device, True)
4632            else:
4633                # Classic requires peer to initiate a connection to wake up the
4634                # dut
4635                connect_func = self.test_connection_by_device_only
4636                if should_wake:
4637                    connect_func(device, adapter_address)
4638                else:
4639                    # If we're not expecting wake, this connect attempt will
4640                    # probably fail.
4641                    self.ignore_failure(connect_func, device, adapter_address)
4642
4643        thread = threading.Thread(target=_action_device_connect)
4644        return thread
4645
4646
4647    @test_retry_and_log(False)
4648    def test_hid_device_created(self, device_address):
4649        """ Tests that the hid device is created before using it for tests.
4650
4651        @param device_address: Address of peripheral device
4652        """
4653        device_found = self.bluetooth_facade.wait_for_hid_device(
4654                device_address)
4655        self.results = {
4656                'device_found': device_found
4657        }
4658        return all(self.results.values())
4659
4660
4661    @test_retry_and_log(False)
4662    def test_hid_device_created_speed(self, device):
4663        """ Tests that the hid device is created with faster polling.
4664
4665        @param device: Peripheral device
4666        """
4667        device_found = self.bluetooth_facade.wait_for_hid_device(
4668                device_address=device.address, sleep_interval=0.1)
4669        self.results = {'device_found': device_found}
4670        return all(self.results.values())
4671
4672
4673    @test_retry_and_log(False)
4674    def test_hid_device_reconnect_time(self, duration, device_type):
4675        """ Tests that the hid device reconnection is fast enough.
4676
4677        @param duration: The averaged duration of HID reconnection
4678        @param device_type: Specified the type of the device
4679        """
4680
4681        if 'BLE' in device_type:
4682            max_duration = LE_HID_RECONNECT_TIME_MAX_SEC
4683        else:
4684            max_duration = HID_RECONNECT_TIME_MAX_SEC
4685
4686        self.results = {
4687                'hid_reconnect_time': duration,
4688                'max_passing_time': max_duration
4689        }
4690        return duration < max_duration
4691
4692
4693    @test_retry_and_log
4694    def test_battery_reporting(self, device):
4695        """ Tests that battery reporting through GATT can be received
4696
4697        @param device: the meta device containing a Bluetooth device
4698
4699        @returns: true if battery reporting is received
4700        """
4701
4702        def _get_battery_percentage():
4703            return self.bluetooth_facade.get_battery_property(
4704                    device.address, 'Percentage')
4705
4706        # Sometimes the battery interface isn't available on the device
4707        # right away. Wait for it to become available.
4708        utils.poll_for_condition(
4709                condition=lambda: _get_battery_percentage() is not None,
4710                timeout=self.ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS,
4711                sleep_interval=self.ADAPTER_POLLING_DEFAULT_SLEEP_SECS,
4712                desc='Waiting for battery on %s' % device.address)
4713
4714        return _get_battery_percentage() > 0
4715
4716    def _apply_new_adapter_alias(self, alias):
4717        """ Sets new system alias and applies discoverable setting
4718
4719        @param alias: string alias to be applied to Adapter->Alias property
4720        """
4721
4722        # Set Adapter's Alias property
4723        self.bluetooth_facade.set_adapter_alias(alias)
4724
4725        # Set discoverable setting on
4726        self.bluetooth_facade.set_discoverable(True)
4727
4728    @test_retry_and_log(False)
4729    def test_set_adapter_alias(self, alias):
4730        """ Validates that a new adapter alias is applied correctly
4731
4732        @param alias: string alias to be applied to Adapter->Alias property
4733
4734        @returns: True if the applied alias is properly applied in btmon trace
4735        """
4736
4737        orig_alias = self.get_adapter_properties()['Alias']
4738        self.bluetooth_le_facade = self.bluetooth_facade
4739
4740        # 1. Capture btmon logs around alias set operation
4741        self._get_btmon_log(lambda: self._apply_new_adapter_alias(alias))
4742
4743        # 2. Verify that name appears in btmon trace with the following format:
4744        # "Name (complete): Chromebook_BA0E" as appears in EIR data set
4745        expected_alias_str = 'Name (complete): ' + alias
4746        alias_found = self.bluetooth_facade.btmon_find(expected_alias_str)
4747
4748        # 3. Re-apply previous bluez alias as other tests expect default
4749        self.bluetooth_facade.set_adapter_alias(orig_alias)
4750
4751        self.results = {'alias_found': alias_found}
4752        return all(self.results.values())
4753
4754    # -------------------------------------------------------------------
4755    # Autotest methods
4756    # -------------------------------------------------------------------
4757
4758
4759    def initialize(self):
4760        """Initialize bluetooth adapter tests."""
4761        # Run through every tests and collect failed tests in self.fails.
4762        self.fails = []
4763
4764        # If a test depends on multiple conditions, write the results of
4765        # the conditions in self.results so that it is easy to know
4766        # what conditions failed by looking at the log.
4767        self.results = None
4768
4769        # If any known failures were seen in the logs at any time during this
4770        # test execution, we capture that here. This includes daemon crashes,
4771        # usb disconnects or any of the other known common failure reasons
4772        self.had_known_common_failure = False
4773
4774        # Some tests may instantiate a peripheral device for testing.
4775        self.devices = dict()
4776        for device_type in SUPPORTED_DEVICE_TYPES:
4777            self.devices[device_type] = list()
4778
4779        # The count of registered advertisements.
4780        self.count_advertisements = 0
4781
4782
4783    def get_device_sample_rssi(self, device, use_cached_value=True):
4784        """ Get one RSSI value of the given device.
4785
4786        @param device: the peer device to be examined RSSI
4787        @param use_cached_value: Use the cached value
4788
4789        @returns: rssi value if the device is found,
4790                  None otherwise
4791        """
4792
4793        # Maximum retry attempts of RSSI query
4794        MAX_RETRY = 3
4795        # Time between each RSSI query
4796        WAIT_TIME = 2
4797        rssi = None
4798
4799        # device could have tested RSSI if we enable check_rssi, if so, reuse it
4800        #
4801        # Note:
4802        # device is special in that hasattr(device, xxx) will evaluate to
4803        # the _Method class if xxx does not physically exist. Hence,
4804        # isinstance(device.rssi, int) instead of hasattr(device, 'rssi')
4805        # is used as the condition below.
4806        # Refer to class _Method in client/cros/chameleon/chameleon.py
4807        if isinstance(device.rssi, int) and use_cached_value:
4808            return device.rssi
4809
4810        try:
4811            self.test_start_discovery()
4812
4813            # The RSSI property is only maintained while discovery is
4814            # enabled.  Stopping discovery removes the property. Thus, look
4815            # up the RSSI without modifying discovery state.
4816            found = self.test_discover_device(device.address,
4817                                              start_discovery=False,
4818                                              stop_discovery=False)
4819
4820            if not found:
4821                logging.info('Device %s not found', device.address)
4822                return None
4823
4824            for i in range(MAX_RETRY):
4825                rssi = self.bluetooth_facade.get_device_property(
4826                        device.address, 'RSSI')
4827                if rssi:
4828                    break
4829                time.sleep(WAIT_TIME)
4830
4831            if not rssi:
4832                logging.info('RSSI of device %s not found', device.address)
4833                return None
4834
4835            device.rssi = rssi
4836            logging.info('Peer {} RSSI {}'.format(device.address, rssi))
4837
4838        finally:
4839            self.test_stop_discovery()
4840            logging.info('Clearing device for test: {}'.format(device.address))
4841            self.bluetooth_facade.remove_device_object(device.address)
4842
4843        return rssi
4844
4845    def check_floss_support(self):
4846        """ Check whether this device supports Floss
4847
4848        Check for the presence of /usr/bin/btmanagerd and fail with TESTNA
4849        if the file is not present. This should only fail on the following boards
4850        with 2GB rootfs where Floss is not enabled
4851        ['asuka', 'banon', 'bob', 'caroline', 'cave', 'celes',
4852        'chell', 'coral', 'cyan', 'edgar', 'elm', 'gru', 'hana',
4853        'kefka', 'kevin', 'lars', 'pyro', 'reef', 'reks', 'relm',
4854        'sand', 'scarlet', 'sentry', 'setzer', 'snappy', 'terra', 'ultima']
4855
4856
4857        @raises error.TestNA if device doesn't support Floss
4858        """
4859        if not self.bluetooth_facade.is_btmanagerd_present():
4860            raise error.TestNAError('Floss cannot be enabled on this device')
4861
4862    def verify_device_rssi(self, device_list):
4863        """ Test device rssi is over required threshold.
4864
4865        @param device_list: List of peer devices to verify rssi
4866
4867        @raises error.TestNA if any device isn't found or RSSI is too low
4868        """
4869        try:
4870            self.test_start_discovery()
4871            for device in device_list:
4872                # The RSSI property is only maintained while discovery is
4873                # enabled.  Stopping discovery removes the property. Thus, look
4874                # up the RSSI without modifying discovery state.
4875                found = self.test_discover_device(device.address,
4876                                                  start_discovery=False,
4877                                                  stop_discovery=False)
4878                rssi = self.bluetooth_facade.get_device_property(
4879                        device.address, 'RSSI')
4880
4881                if not found:
4882                    # Not clearing self.fails will result in test
4883                    # failing with test_discover_device failure
4884                    self.fails = []
4885                    logging.info(
4886                            'Failing with TEST_NA as peer %s was not'
4887                            ' discovered during RSSI check', device.address)
4888                    raise error.TestNAError(
4889                            'Peer {} not discovered during RSSI check'.format(
4890                                    device.address))
4891
4892                if not rssi or rssi < self.MIN_RSSI:
4893                    logging.info('Failing with TEST_NA since RSSI (%s) is low ',
4894                                  rssi)
4895                    raise error.TestNAError(
4896                            'Peer {} RSSI is too low: {}'.format(
4897                                    device.address, rssi))
4898                device.rssi = rssi
4899
4900                logging.info('Peer {} RSSI {}'.format(device.address, rssi))
4901        finally:
4902            self.test_stop_discovery()
4903
4904            for device in device_list:
4905                logging.info('Clearing device for test: {}'.format(
4906                        device.address))
4907                self.bluetooth_facade.remove_device_object(device.address)
4908
4909    def verify_controller_capability(self, required_roles=[],
4910                                     test_type=''):
4911        """Raise an exception if required role support isn't present
4912
4913        @param required_roles: List of test role requirements in
4914                               ["central", "peripheral", "central-peripheral"]
4915
4916        @raises: error.TestFail if device does not meet requirements
4917                                AND test_type is 'AVL'
4918                 error.TestNA if device does not meet requirements
4919                                and test_type is not 'AVL'
4920        """
4921
4922        adapter_props = self.get_adapter_properties()
4923
4924        supported_roles = adapter_props.get('Roles', [])
4925
4926        for req in required_roles:
4927            if req not in supported_roles:
4928                # We don't meet requirements, throw error
4929                msg = 'Role requirement {} not in supported modes {}'.format(
4930                      req, supported_roles)
4931
4932                if test_type == 'AVL':
4933                    raise error.TestFail(msg)
4934
4935                logging.info('Failing with TEST_NA due to %s', msg)
4936                raise error.TestNAError(msg)
4937
4938
4939    def set_fail_fast(self, args_dict, default=False):
4940        """Set whether the test should fail fast if running into any problem
4941
4942        By default it should not fail fast so that a batch test can continue
4943        running the rest after a failure in one test
4944
4945        :param args_dict: the arguments passed int from the command line
4946        :param default: the default value when the flag is missing from the
4947                        args_dict
4948
4949        """
4950        flag_name = 'fail_fast'
4951        if args_dict and flag_name in args_dict:
4952            self.fail_fast = bool(args_dict[flag_name].lower() == 'true')
4953        else:
4954            self.fail_fast = default
4955
4956
4957    def assert_discover_and_pair(self, device):
4958        """ Discovers and pairs given device. Automatically connects too.
4959
4960        If any of the test expressions fail, it will raise an error so only call
4961        this function as a setup for a test.
4962        """
4963        self.assert_on_fail(self.test_device_set_discoverable(device, True))
4964        self.assert_on_fail(self.test_discover_device(device.address))
4965        self.assert_on_fail(
4966                self.test_pairing(device.address, device.pin, trusted=True))
4967
4968    def identify_platform_failure_reasons(self):
4969        """ Identifies platform failure reasons to watch for in logs """
4970        s = self.bluetooth_facade.get_bt_usb_disconnect_str()
4971        if s:
4972            COMMON_FAILURES[s] = 'USB disconnect detected'
4973
4974    def clean_bluetooth_kernel_log(self, level_name):
4975        """Remove Bluetooth kernel logs in /var/log/messages with equal or lower
4976        prioity than level_name
4977
4978        @param level_name: name of the log level, e.x. 'INFO', 'DEBUG'...
4979        """
4980        self.bluetooth_facade.clean_bluetooth_kernel_log(
4981                KERNEL_LOG_LEVEL[level_name])
4982
4983    def run_once(self, *args, **kwargs):
4984        """This method should be implemented by children classes.
4985
4986        Typically, the run_once() method would look like:
4987
4988        factory = remote_facade_factory.RemoteFacadeFactory(host)
4989        self.bluetooth_facade = factory.create_bluetooth_facade(self.floss)
4990
4991        self.test_bluetoothd_running()
4992        # ...
4993        # invoke more self.test_xxx() tests.
4994        # ...
4995
4996        if self.fails:
4997            raise error.TestFail(self.fails)
4998
4999        """
5000        raise NotImplementedError
5001
5002
5003    def cleanup_bt_test(self, test_state='END'):
5004        """Clean up bluetooth adapter tests.
5005
5006        @param test_state: string describing the requested clear is for
5007                           a new test(NEW), the middle of the test(MID),
5008                           or the end of the test(END).
5009        """
5010
5011        if test_state == 'END':
5012            # Disable all the bluetooth debug logs
5013            self.enable_disable_debug_log(enable=False)
5014
5015            # Re-enable cellular services
5016            self.enable_disable_cellular(enable=True)
5017
5018            # Re-enable ui
5019            self.enable_disable_ui(enable=True)
5020
5021            if hasattr(self, 'host'):
5022                # Stop btmon process
5023                self.host.run('pkill btmon || true')
5024
5025                #Stop tcpdump usbmon process
5026                self.host.run('pkill tcpdump || true')
5027
5028
5029        # Close the device properly if a device is instantiated.
5030        # Note: do not write something like the following statements
5031        #           if self.devices[device_type]:
5032        #       or
5033        #           if bool(self.devices[device_type]):
5034        #       Otherwise, it would try to invoke bluetooth_mouse.__nonzero__()
5035        #       which just does not exist.
5036        for device_name, device_list  in self.devices.items():
5037            for device in device_list:
5038                if device is not None:
5039                    device.Close()
5040
5041                    # Power cycle BT device if we're in the middle of a test
5042                    if test_state == 'MID':
5043                        device.PowerCycle()
5044
5045        self.devices = dict()
5046        for device_type in SUPPORTED_DEVICE_TYPES:
5047            self.devices[device_type] = list()
5048
5049    # Called only by test.test
5050    def cleanup(self):
5051        """Cleanup test.test instance"""
5052
5053        self.cleanup_bt_test()
5054