1# Lint as: python2, python3 2# Copyright 2016 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Server side bluetooth adapter subtests.""" 7 8from __future__ import absolute_import 9from __future__ import division 10from __future__ import print_function 11 12from datetime import datetime, timedelta 13import errno 14import functools 15import six.moves.http_client 16import inspect 17import logging 18import multiprocessing 19import os 20import re 21import socket 22import threading 23import time 24 25import common 26from autotest_lib.client.bin import utils 27from autotest_lib.client.bin.input import input_event_recorder as recorder 28from autotest_lib.client.common_lib import error 29from autotest_lib.client.common_lib.cros.bluetooth import bluetooth_socket 30from autotest_lib.client.cros.chameleon import chameleon 31from autotest_lib.server.cros.bluetooth import bluetooth_test_utils 32from autotest_lib.server import test 33 34from autotest_lib.client.bin.input.linux_input import ( 35 BTN_LEFT, BTN_RIGHT, EV_KEY, EV_REL, REL_X, REL_Y, REL_WHEEL, 36 REL_WHEEL_HI_RES, KEY_PLAYCD, KEY_PAUSECD, KEY_STOPCD, KEY_NEXTSONG, 37 KEY_PREVIOUSSONG) 38from autotest_lib.server.cros.bluetooth.bluetooth_gatt_client_utils import ( 39 GATT_ClientFacade, GATT_Application, GATT_HIDApplication) 40from autotest_lib.server.cros.multimedia import remote_facade_factory 41import six 42from six.moves import map 43from six.moves import range 44from six.moves import zip 45 46 47Event = recorder.Event 48 49# We have a number of chipsets that are no longer supported. Known issues 50# related to firmware will be ignored on these devices (b/169328792). 51UNSUPPORTED_CHIPSETS = ['MVL-8897', 'MVL-8997', 'Intel-AC7260', 'Intel-AC7265'] 52 53# Location of data traces relative to this (bluetooth_adapter_tests.py) file 54BT_ADAPTER_TEST_PATH = os.path.dirname(__file__) 55TRACE_LOCATION = os.path.join(BT_ADAPTER_TEST_PATH, 'input_traces/keyboard') 56 57RESUME_DELTA = -5 58 59# Delay binding the methods since host is only available at run time. 60SUPPORTED_DEVICE_TYPES = { 61 'MOUSE': lambda btpeer: btpeer.get_bluetooth_hid_mouse, 62 'KEYBOARD': lambda btpeer: btpeer.get_bluetooth_hid_keyboard, 63 'BLE_MOUSE': lambda btpeer: btpeer.get_ble_mouse, 64 'BLE_KEYBOARD': lambda btpeer: btpeer.get_ble_keyboard, 65 # Tester allows us to test DUT's discoverability, etc. from a peer 66 'BLUETOOTH_TESTER': lambda btpeer: btpeer.get_bluetooth_tester, 67 # This is a base object that does not emulate any Bluetooth device. 68 # This object is preferred when only a pure XMLRPC server is needed 69 # on the btpeer host, e.g., to perform servod methods. 70 'BLUETOOTH_BASE': lambda btpeer: btpeer.get_bluetooth_base, 71 # on the chameleon host, e.g., to perform servod methods. 72 'BLUETOOTH_BASE': lambda chameleon: chameleon.get_bluetooth_base, 73 # A phone device that supports Bluetooth 74 'BLE_PHONE': lambda chameleon: chameleon.get_ble_phone, 75 # A Bluetooth audio device emulating a headphone 76 'BLUETOOTH_AUDIO': lambda chameleon: chameleon.get_bluetooth_audio, 77 # A Bluetooth device that implements the Fast Pair protocol. 78 'BLE_FAST_PAIR': lambda chameleon: chameleon.get_ble_fast_pair, 79} 80 81COMMON_FAILURES = { 82 'Freeing adapter /org/bluez/hci': 'adapter_freed', 83 '/var/spool/crash/bluetoothd': 'bluetoothd_crashed', 84 'btintel_hw_error': 'intel hardware error detected', 85 'qca_hw_error': 'qca hardware error detected', 86 'cmd_cnt 0 cmd queued ([5-9]|[1-9][0-9]+)': 'controller cmd capacity', 87} 88 89# TODO(b/150898182) - Don't run some tests on tablet form factors 90# This list was generated by looking for tablet models on Goldeneye and removing 91# the ones that were not launched 92TABLET_MODELS = ['kakadu', 'kodama', 'krane', 'dru', 'druwl', 'dumo'] 93 94# Some platforms do not have built-in I/O hardware, and so they are configured 95# to automatically reconnect to paired HID devices on boot. We note these 96# platform types here as there will be different behavior expectations around 97# reboot. 98RECONNECT_PLATFORM_TYPES = ['CHROMEBOX', 'CHROMEBIT', 'CHROMEBASE'] 99 100# TODO(b/158336394) Realtek: Powers down during suspend due to high power usage 101# during S3. 102# TODO(b/168152910) Marvell: Powers down during suspend due to flakiness when 103# entering suspend. This will also skip the tests 104# for Veyron (which don't power down right now) but 105# reconnect tests are still enabled for that platform 106# to check for suspend stability. 107SUSPEND_POWER_DOWN_CHIPSETS = ['Realtek-RTL8822C-USB', 'MVL-8897', 'MVL-8997'] 108 109# All realtek chipsets on USB will drop its firmware and reload on 110# suspend-resume unless it is connected to a peer device. This doesn't 111# include RTL8822, which would reset regardless of the peer. 112SUSPEND_RESET_IF_NO_PEER_CHIPSETS = ['Realtek-RTL8852A-USB'] 113 114# Models to skip since they power down on suspend. 115SUSPEND_POWER_DOWN_MODELS = ['dru', 'druwl', 'dumo'] 116 117# Chipsets which do not support Bluetooth Hardware Filtering. 118UNSUPPORTED_BT_HW_FILTERING_CHIPSETS = [ 119 'MVL-8897', 'MVL-8997', 'QCA-6174A-5-USB', 'QCA-6174A-3-UART', 120 'QCA-WCN6856', 'Intel-AC7260', 'Intel-AC7265', 'Realtek-RTL8822C-USB', 121 'Realtek-RTL8822C-UART', 'Realtek-RTL8852A-USB', 122 'Mediatek-MTK7921-USB', 'Mediatek-MTK7921-SDIO' 123] 124 125KERNEL_LOG_LEVEL = { 126 'EMERG': 0, 127 'ALERT': 1, 128 'CRIT': 2, 129 'ERR': 3, 130 'WARNING': 4, 131 'NOTICE': 5, 132 'INFO': 6, 133 'DEBUG': 7 134} 135 136# The benchmark criterion to determine whether HID device reconnection is fast 137HID_RECONNECT_TIME_MAX_SEC = 3 138LE_HID_RECONNECT_TIME_MAX_SEC = 3 139 140 141def method_name(): 142 """Get the method name of a class. 143 144 This function is supposed to be invoked inside a class and will 145 return current method name who invokes this function. 146 147 @returns: the string of the method name inside the class. 148 """ 149 return inspect.getouterframes(inspect.currentframe())[1][3] 150 151 152def _run_method(method, method_name, *args, **kwargs): 153 """Run a target method and capture exceptions if any. 154 155 This is just a wrapper of the target method so that we do not need to 156 write the exception capturing structure repeatedly. The method could 157 be either a device method or a facade method. 158 159 @param method: the method to run 160 @param method_name: the name of the method 161 162 @returns: the return value of target method() if successful. 163 False otherwise. 164 165 """ 166 result = False 167 try: 168 result = method(*args, **kwargs) 169 except Exception as e: 170 logging.error('%s: %s', method_name, e) 171 except: 172 logging.error('%s: unexpected error', method_name) 173 return result 174 175 176def get_bluetooth_emulated_device(btpeer, device_type): 177 """Get the bluetooth emulated device object. 178 179 @param btpeer: the Bluetooth peer device 180 @param device_type : the bluetooth device type, e.g., 'MOUSE' 181 182 @returns: the bluetooth device object 183 184 """ 185 186 def _retry_device_method(method_name, legal_falsy_values=[]): 187 """retry the emulated device's method. 188 189 The method is invoked as device.xxxx() e.g., device.GetAdvertisedName(). 190 191 Note that the method name string is provided to get the device's actual 192 method object at run time through getattr(). The rebinding is required 193 because a new device may have been created previously or during the 194 execution of fix_serial_device(). 195 196 Given a device's method, it is not feasible to get the method name 197 through __name__ attribute. This limitation is due to the fact that 198 the device is a dotted object of an XML RPC server proxy. 199 As an example, with the method name 'GetAdvertisedName', we could 200 derive the correspoinding method device.GetAdvertisedName. On the 201 contrary, given device.GetAdvertisedName, it is not feasible to get the 202 method name by device.GetAdvertisedName.__name__ 203 204 Also note that if the device method fails, we would try remediation 205 step and retry the device method. The remediation steps are 206 1) re-creating the serial device. 207 2) reset (powercycle) the bluetooth dongle. 208 3) reboot Bluetooth peer. 209 If the device method still fails after these steps, we fail the test 210 211 The default values exist for uses of this function before the options 212 were added, ideally we should change zero_ok to False. 213 214 @param method_name: the string of the method name. 215 @param legal_falsy_values: Values that are falsy but might be OK. 216 217 @returns: the result returned by the device's method if the call was 218 successful 219 220 @raises: TestError if the devices's method fails or if repair of 221 peripheral kit fails 222 223 """ 224 225 action_table = [('recreate' , 'Fixing the serial device'), 226 ('reset', 'Power cycle the peer device'), 227 ('reboot', 'Reboot the chamleond host')] 228 229 for i, (action, description) in enumerate(action_table): 230 logging.info('Attempt %s : %s ', i+1, method_name) 231 232 result = _run_method(getattr(device, method_name), method_name) 233 if _is_successful(result, legal_falsy_values): 234 return result 235 236 logging.error('%s failed the %s time. Attempting to %s', 237 method_name,i,description) 238 if not fix_serial_device(btpeer, device, action): 239 logging.info('%s failed', description) 240 else: 241 logging.info('%s successful', description) 242 243 #try it last time after fix it by last action 244 result = _run_method(getattr(device, method_name), method_name) 245 if _is_successful(result, legal_falsy_values): 246 return result 247 248 raise error.TestError('Failed to execute %s. Bluetooth peer device is' 249 'not working' % method_name) 250 251 252 if device_type not in SUPPORTED_DEVICE_TYPES: 253 raise error.TestError('The device type is not supported: %s', 254 device_type) 255 256 # Get the bluetooth device object and query some important properties. 257 device = SUPPORTED_DEVICE_TYPES[device_type](btpeer)() 258 259 # Get some properties of the kit 260 # NOTE: Strings updated here must be kept in sync with Btpeer. 261 device._capabilities = _retry_device_method('GetCapabilities') 262 device._transports = device._capabilities["CAP_TRANSPORTS"] 263 device._is_le_only = ("TRANSPORT_LE" in device._transports and 264 len(device._transports) == 1) 265 device._has_pin = device._capabilities["CAP_HAS_PIN"] 266 device.can_init_connection = device._capabilities["CAP_INIT_CONNECT"] 267 268 _retry_device_method('Init') 269 logging.info('device type: %s', device_type) 270 271 device.name = _retry_device_method('GetAdvertisedName') 272 logging.info('device name: %s', device.name) 273 274 device.address = _retry_device_method('GetLocalBluetoothAddress') 275 logging.info('address: %s', device.address) 276 277 pin_falsy_values = [] if device._has_pin else [None] 278 device.pin = _retry_device_method('GetPinCode', pin_falsy_values) 279 logging.info('pin: %s', device.pin) 280 281 class_falsy_values = [None] if device._is_le_only else [0] 282 283 # Class of service is None for LE-only devices. Don't fail or parse it. 284 device.class_of_service = _retry_device_method('GetClassOfService', 285 class_falsy_values) 286 if device._is_le_only: 287 parsed_class_of_service = device.class_of_service 288 else: 289 parsed_class_of_service = "0x%04X" % device.class_of_service 290 logging.info('class of service: %s', parsed_class_of_service) 291 292 device.class_of_device = _retry_device_method('GetClassOfDevice', 293 class_falsy_values) 294 # Class of device is None for LE-only devices. Don't fail or parse it. 295 if device._is_le_only: 296 parsed_class_of_device = device.class_of_device 297 else: 298 parsed_class_of_device = "0x%04X" % device.class_of_device 299 logging.info('class of device: %s', parsed_class_of_device) 300 301 device.device_type = _retry_device_method('GetDeviceType') 302 logging.info('device type: %s', device.device_type) 303 304 device.authentication_mode = None 305 if not device._is_le_only: 306 device.authentication_mode = _retry_device_method('GetAuthenticationMode') 307 logging.info('authentication mode: %s', device.authentication_mode) 308 309 device.port = _retry_device_method('GetPort') 310 logging.info('serial port: %s\n', device.port) 311 312 return device 313 314 315def recreate_serial_device(device): 316 """Create and connect to a new serial device. 317 318 @param device: the bluetooth HID device 319 320 @returns: True if the serial device is re-created successfully. 321 322 """ 323 logging.info('Remove the old serial device and create a new one.') 324 if device is not None: 325 try: 326 device.Close() 327 except: 328 logging.error('failed to close the serial device.') 329 return False 330 try: 331 device.CreateSerialDevice() 332 return True 333 except: 334 logging.error('failed to invoke CreateSerialDevice.') 335 return False 336 337 338def _check_device_init(device, operation): 339 # Check if the serial device could initialize, connect, and 340 # enter command mode correctly. 341 logging.info('Checking device status...') 342 if not _run_method(device.Init, 'Init'): 343 logging.info('device.Init: failed after %s', operation) 344 return False 345 if not device.CheckSerialConnection(): 346 logging.info('device.CheckSerialConnection: failed after %s', operation) 347 return False 348 if not _run_method(device.EnterCommandMode, 'EnterCommandMode'): 349 logging.info('device.EnterCommandMode: failed after %s', operation) 350 return False 351 logging.info('The device is created successfully after %s.', operation) 352 return True 353 354def _reboot_btpeer(btpeer, device): 355 """ Reboot Bluetooth peer device. 356 357 Also power cycle the device since reboot may not do that..""" 358 359 # Chameleond fizz hosts should have write protect removed and 360 # set_gbb_flags set to 0 to minimize boot time 361 REBOOT_SLEEP_SECS = 10 362 RESET_SLEEP_SECS = 1 363 364 # Close the bluetooth peripheral device and reboot the chameleon board. 365 device.Close() 366 logging.info("Powercycling the device") 367 device.PowerCycle() 368 time.sleep(RESET_SLEEP_SECS) 369 logging.info('rebooting Bluetooth peer...') 370 btpeer.reboot() 371 372 # Every btpeer reboot would take a bit more than REBOOT_SLEEP_SECS. 373 # Sleep REBOOT_SLEEP_SECS and then begin probing the btpeer board. 374 time.sleep(REBOOT_SLEEP_SECS) 375 return _check_device_init(device, 'reboot') 376 377def _reset_device_power(device): 378 """Power cycle the device.""" 379 RESET_SLEEP_SECS = 1 380 try: 381 if not device.PowerCycle(): 382 logging.info('device.PowerCycle() failed') 383 return False 384 except: 385 logging.error('exception in device.PowerCycle') 386 else: 387 logging.info('device powercycled') 388 time.sleep(RESET_SLEEP_SECS) 389 return _check_device_init(device, 'reset') 390 391def _is_successful(result, legal_falsy_values=[]): 392 """Is the method result considered successful? 393 394 Some method results, for example that of class_of_service, may be 0 which is 395 considered a valid result. Occassionally, None is acceptable. 396 397 The default values exist for uses of this function before the options were 398 added, ideally we should change zero_ok to False. 399 400 @param result: a method result 401 @param legal_falsy_values: Values that are falsy but might be OK. 402 403 @returns: True if bool(result) is True, or if result is 0 and zero_ok, or if 404 result is None and none_ok. 405 """ 406 truthiness_of_result = bool(result) 407 return truthiness_of_result or result in legal_falsy_values 408 409 410def _flag_common_failures(instance): 411 """Checks if a common failure has occurred during the test run 412 413 Scans system logs for known signs of failure. If a failure is discovered, 414 it is added to the test results, to make it easier to identify common root 415 causes from Stainless 416 """ 417 had_failure = False 418 419 for fail_tag, fail_log in COMMON_FAILURES.items(): 420 if instance.bluetooth_facade.messages_find(fail_tag): 421 had_failure = True 422 logging.error('Detected failure tag: %s', fail_tag) 423 # We mark this instance's results with the discovered failure 424 if type(instance.results) is dict: 425 instance.results[fail_log] = True 426 427 return had_failure 428 429 430def fix_serial_device(btpeer, device, operation='reset'): 431 """Fix the serial device. 432 433 This function tries to fix the serial device by 434 (1) re-creating a serial device, or 435 (2) power cycling the usb port to which device is connected 436 (3) rebooting the Bluetooth peeer 437 438 Argument operation determine which of the steps above are perform 439 440 Note that rebooting the btpeer board or resetting the device will remove 441 the state on the peripheral which might cause test failures. Please use 442 reset/reboot only before or after a test. 443 444 @param btpeer: the Bluetooth peer 445 @param device: the bluetooth device. 446 @param operation: Recovery operation to perform 'recreate/reset/reboot' 447 448 @returns: True if the serial device is fixed. False otherwise. 449 450 """ 451 452 if operation == 'recreate': 453 # Check the serial connection. Fix it if needed. 454 if device.CheckSerialConnection(): 455 # The USB serial connection still exists. 456 # Re-connection suffices to solve the problem. The problem 457 # is usually caused by serial port change. For example, 458 # the serial port changed from /dev/ttyUSB0 to /dev/ttyUSB1. 459 logging.info('retry: creating a new serial device...') 460 return recreate_serial_device(device) 461 else: 462 # Recreate the bluetooth peer device 463 return _check_device_init(device, operation) 464 465 elif operation == 'reset': 466 # Powercycle the USB port where the bluetooth peer device is connected. 467 # RN-42 and RN-52 share the same vid:pid so both will be powercycled. 468 # This will only work on fizz host with write protection removed. 469 # Note that the state on the device will be lost. 470 return _reset_device_power(device) 471 472 elif operation == 'reboot': 473 # Reboot the Bluetooth peer device. 474 # The device is power cycled before rebooting Bluetooth peer device 475 return _reboot_btpeer(btpeer, device) 476 477 else: 478 logging.error('fix_serial_device Invalid operation %s', operation) 479 return False 480 481 482def retry(test_method, instance, *args, **kwargs): 483 """Execute the target facade test_method(). Retry if failing the first time. 484 485 A test_method is something like self.test_xxxx() in BluetoothAdapterTests, 486 e.g., BluetoothAdapterTests.test_bluetoothd_running(). 487 488 @param test_method: the test method to retry 489 490 @returns: True if the return value of test_method() is successful. 491 False otherwise. 492 493 """ 494 if _is_successful(_run_method(test_method, test_method.__name__, 495 instance, *args, **kwargs)): 496 return True 497 498 # Try to fix the serial device if applicable. 499 logging.error('%s failed at the 1st time: (%s)', test_method.__name__, 500 str(instance.results)) 501 502 # If this test does not use any attached serial device, just re-run 503 # the test. 504 logging.info('%s: retry the 2nd time.', test_method.__name__) 505 time.sleep(1) 506 507 508 if not hasattr(instance, 'use_btpeer'): 509 return _is_successful(_run_method(test_method, test_method.__name__, 510 instance, *args, **kwargs)) 511 for device_type in SUPPORTED_DEVICE_TYPES: 512 for device in getattr(instance, 'devices')[device_type]: 513 #fix_serial_device in 'recreate' mode doesn't require btpeer 514 #so just pass None for convenient. 515 if not fix_serial_device(None, device, "recreate"): 516 return False 517 518 logging.info('%s: retry the 2nd time.', test_method.__name__) 519 return _is_successful(_run_method(test_method, test_method.__name__, 520 instance, *args, **kwargs)) 521 522 523def test_retry_and_log(test_method_or_retry_flag, 524 messages_start=True, 525 messages_stop=True): 526 """A decorator that logs test results, collects error messages, and retries 527 on request. 528 529 @param test_method_or_retry_flag: either the test_method or a retry_flag. 530 There are some possibilities of this argument: 531 1. the test_method to conduct and retry: should retry the test_method. 532 This occurs with 533 @test_retry_and_log 534 2. the retry flag is True. Should retry the test_method. 535 This occurs with 536 @test_retry_and_log(True) 537 3. the retry flag is False. Do not retry the test_method. 538 This occurs with 539 @test_retry_and_log(False) 540 541 @param messages_start: Start collecting messages before running the test 542 @param messages_stop: Stop collecting messages after running the test and 543 analyze the results. 544 545 @returns: a wrapper of the test_method with test log. The retry mechanism 546 would depend on the retry flag. 547 548 """ 549 550 def decorator(test_method): 551 """A decorator wrapper of the decorated test_method. 552 553 @param test_method: the test method being decorated. 554 555 @returns the wrapper of the test method. 556 557 """ 558 @functools.wraps(test_method) 559 def wrapper(instance, *args, **kwargs): 560 """A wrapper of the decorated method. 561 562 @param instance: an BluetoothAdapterTests instance 563 564 @returns the result of the test method 565 566 """ 567 instance.results = None 568 fail_msg = None 569 test_result = False 570 should_raise = hasattr(instance, 'fail_fast') and instance.fail_fast 571 572 instance.last_test_method = test_method.__name__ 573 syslog_captured = False 574 575 try: 576 logging.info('[>>> running: {}]'.format(test_method.__name__)) 577 start_time = time.time() 578 if messages_start: 579 # Grab /var/log/messages output during test run 580 instance.bluetooth_facade.messages_start() 581 582 if callable(test_method_or_retry_flag 583 ) or test_method_or_retry_flag: 584 test_result = retry(test_method, instance, *args, **kwargs) 585 else: 586 test_result = test_method(instance, *args, **kwargs) 587 588 if messages_stop: 589 syslog_captured = instance.bluetooth_facade.messages_stop() 590 591 if syslog_captured: 592 had_failure = _flag_common_failures(instance) 593 instance.had_known_common_failure = any( 594 [instance.had_known_common_failure, had_failure]) 595 596 logging.debug('instance._expected_result : %s', 597 instance._expected_result) 598 elapsed_time = 'elapsed_time: {:.3f}s'.format(time.time() - 599 start_time) 600 if instance._expected_result: 601 if test_result: 602 logging.info('[*** passed: {}] {}'.format( 603 test_method.__name__, elapsed_time)) 604 else: 605 fail_msg = '[--- failed: {} ({})]'.format( 606 test_method.__name__, str(instance.results)) 607 logging.error('{} {}'.format(fail_msg, elapsed_time)) 608 instance.fails.append(fail_msg) 609 else: 610 if test_result: 611 # The test is expected to fail; but it passed. 612 reason = 'expected fail, actually passed' 613 fail_msg = '[--- failed: {} ({})]'.format( 614 test_method.__name__, reason) 615 logging.error('{} {}'.format(fail_msg, elapsed_time)) 616 instance.fails.append(fail_msg) 617 else: 618 # The test is expected to fail; and it did fail. 619 reason = 'expected fail, actually failed' 620 logging.info('[*** passed: {} ({})] {}'.format( 621 test_method.__name__, reason, elapsed_time)) 622 623 # Reset _expected_result and let the quicktest wrapper catch it. 624 # These errors should skip out of the testcase entirely. 625 except (error.TestNAError, error.TestError, error.TestFail): 626 instance._expected_result = True 627 raise 628 629 # Next test_method should pass by default. 630 instance._expected_result = True 631 632 # Check whether we should fail fast 633 if fail_msg and should_raise: 634 logging.info('Fail fast') 635 raise error.TestFail(instance.fails) 636 637 return test_result 638 639 return wrapper 640 641 if callable(test_method_or_retry_flag): 642 # If the decorator function comes with no argument like 643 # @test_retry_and_log 644 return decorator(test_method_or_retry_flag) 645 else: 646 # If the decorator function comes with an argument like 647 # @test_retry_and_log(False) 648 return decorator 649 650 651def test_case_log(method): 652 """A decorator for test case methods. 653 654 The main purpose of this decorator is to display the test case name 655 in the test log which looks like 656 657 <... test_case_RA3_CD_SI200_CD_PC_CD_UA3 ...> 658 659 @param method: the test case method to decorate. 660 661 @returns: a wrapper function of the decorated method. 662 663 """ 664 @functools.wraps(method) 665 def wrapper(instance, *args, **kwargs): 666 """Log the name of the wrapped method before execution""" 667 logging.info('\n<... %s ...>', method.__name__) 668 method(instance, *args, **kwargs) 669 return wrapper 670 671 672class BluetoothAdapterTests(test.test): 673 """Server side bluetooth adapter tests. 674 675 This test class tries to thoroughly verify most of the important work 676 states of a bluetooth adapter. 677 678 The various test methods are supposed to be invoked by actual autotest 679 tests such as server/cros/site_tests/bluetooth_Adapter*. 680 681 """ 682 version = 1 683 ADAPTER_ACTION_SLEEP_SECS = 1 684 ADAPTER_PAIRING_TIMEOUT_SECS = 60 685 ADAPTER_CONNECTION_TIMEOUT_SECS = 30 686 # Wait after connect for input device to be ready for use 687 ADAPTER_HID_INPUT_DELAY = 5 688 ADAPTER_DISCONNECTION_TIMEOUT_SECS = 30 689 ADAPTER_PAIRING_POLLING_SLEEP_SECS = 3 690 ADAPTER_DISCOVER_TIMEOUT_SECS = 60 # 30 seconds too short sometimes 691 ADAPTER_DISCOVER_POLLING_SLEEP_SECS = 1 692 ADAPTER_DISCOVER_NAME_TIMEOUT_SECS = 30 693 ADAPTER_WAKE_ENABLE_TIMEOUT_SECS = 30 694 695 ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS = 10 696 ADAPTER_POLLING_DEFAULT_SLEEP_SECS = 1 697 698 HID_REPORT_SLEEP_SECS = 1 699 700 701 DEFAULT_START_DELAY_SECS = 0 702 DEFAULT_HOLD_INTERVAL_SECS = 10 703 DEFAULT_HOLD_TIMEOUT_SECS = 60 704 DEFAULT_HOLD_SLEEP_SECS = 1 705 706 # Default suspend time in seconds for suspend resume. 707 SUSPEND_TIME_SECS=10 708 SUSPEND_ENTER_SECS=10 709 RESUME_TIME_SECS=30 710 RESUME_INTERNAL_TIMEOUT_SECS = 180 711 712 # Minimum RSSI required for peer devices during testing 713 MIN_RSSI = -70 714 715 # hci0 is the default hci device if there is no external bluetooth dongle. 716 EXPECTED_HCI = 'hci0' 717 718 CLASS_OF_SERVICE_MASK = 0xFFE000 719 CLASS_OF_DEVICE_MASK = 0x001FFF 720 721 # Constants about advertising. 722 DEFAULT_MIN_ADVERTISEMENT_INTERVAL_MS = 200 723 DEFAULT_MAX_ADVERTISEMENT_INTERVAL_MS = 200 724 ADVERTISING_INTERVAL_UNIT = 0.625 725 726 # Error messages about advertising dbus methods. 727 ERROR_FAILED_TO_REGISTER_ADVERTISEMENT = ( 728 'org.bluez.Error.NotPermitted: Maximum advertisements reached') 729 ERROR_INVALID_ADVERTISING_INTERVALS = ( 730 'org.bluez.Error.InvalidArguments: Invalid arguments') 731 732 # Supported profiles by ChromeOS. 733 SUPPORTED_UUIDS = { 734 'GATT_UUID': '00001801-0000-1000-8000-00805f9b34fb', 735 'A2DP_SOURCE_UUID': '0000110a-0000-1000-8000-00805f9b34fb', 736 'HFP_AG_UUID': '0000111f-0000-1000-8000-00805f9b34fb', 737 'PNP_UUID': '00001200-0000-1000-8000-00805f9b34fb', 738 'GAP_UUID': '00001800-0000-1000-8000-00805f9b34fb'} 739 740 # Board list for name/ID test check. These devices don't need to be tested 741 REFERENCE_BOARDS = [ 742 'rambi', 'nyan', 'oak', 'reef', 'yorp', 'bip', 'volteer', 743 'volteer2' 744 ] 745 746 # Path for btmon logs 747 BTMON_DIR_LOG_PATH = '/var/log/btmon' 748 749 # Path for usbmon logs 750 USBMON_DIR_LOG_PATH = '/var/log/usbmon' 751 752 # Parameters for usbmon log rotation 753 USBMON_SINGLE_FILE_MAX_SIZE = '10M' # 10M bytes 754 USBMON_NUM_OF_ROTATE_FILE = 2 755 756 # The agent capability of various device types. 757 # Currently all non-Fast Pair are set to NoInputNoOutput since currently 758 # we don't have a way to report the displayed passkey to the device in case 759 # of Passkey Entry. Therefore, use 'Just Works'. 760 # Fast Pair uses DisplayYesNo because this is expected by that protocol. 761 # TODO(b/181945748) update the capabilities when Passkey Entry is supported. 762 AGENT_CAPABILITY = { 763 'BLE_MOUSE': 'NoInputNoOutput', 764 'BLE_KEYBOARD': 'NoInputNoOutput', 765 'BLE_PHONE': 'NoInputNoOutput', 766 'BLUETOOTH_AUDIO': 'NoInputNoOutput', 767 'FAST_PAIR': 'DisplayYesNo', 768 'KEYBOARD': 'NoInputNoOutput', 769 'MOUSE': 'NoInputNoOutput', 770 } 771 772 def assert_on_fail(self, result, raiseNA=False): 773 """ If the called function returns a false-like value, raise an error. 774 775 Call test methods (i.e. with @test_retry_and_log) wrapped with this 776 function and failures will raise instead of continuing the test. 777 778 For example: 779 self.assert_on_fail(self.test_pairing(...)) 780 781 @param result: Result of test method called. 782 @param raiseNA: Whether to raise TestNAError instead of TestFail 783 784 @raises error.TestNAError 785 @raises error.TestFail 786 """ 787 if not result: 788 failure_msg = 'Assert on fail: {}'.format(self.last_test_method) 789 logging.error(failure_msg) 790 if raiseNA: 791 raise error.TestNAError(failure_msg) 792 else: 793 raise error.TestFail(failure_msg) 794 795 796 def expect_fail(self, test_method, *args, **kwargs): 797 """Run the test_method which is expected to fail. 798 799 Here a test means one that comes with the @test_retry_and_log 800 decorator. 801 802 In most cases, a test is expected to pass by default. However, in 803 some cases, we may expect a test to fail. As an example, a test is 804 expected to fail if the behavior is disallowed by the policy. In 805 this case, the failure is considered a pass. The example statements 806 look like 807 808 # Set an arbitrary UUID that disallows the HID device. 809 self.test_check_set_allowlist('0xabcd', True) 810 811 # Since the HID device is disallowed above, 812 # the self.test_keyboard_input_from_trace test itself would 813 # fail which is expected. 814 self.expect_fail(self.test_keyboard_input_from_trace, 815 device, "simple_text") 816 817 In the log, the message would show that the keyboard input failed as 818 819 test_keyboard_input_from_trace: InputEventRecorderError: Failed to 820 find the device node of KEYBD_REF. 821 822 As a result, the log message shows that the test conceptually passed. 823 824 [*** passed: test_keyboard_input_from_trace (expected fail, 825 actually failed)] 826 827 @param test_method: the test method to run. 828 829 @returns: True if the test method failed; False otherwise. 830 """ 831 self._expected_result = False 832 logging.debug('self._expected_result %s', self._expected_result) 833 return test_method(*args, **kwargs) 834 835 836 def expect_test(self, expected_result, test_method, *args, **kwargs): 837 """Run the test method and expect the test result as expected_result. 838 839 This little helper is used to make simple the following statements 840 841 if expected_result: 842 self.test_xxx(device) 843 else: 844 self.expect_fail(self.test_xxx, device) 845 846 which can be converted to 847 848 self.expect_test(expected_result, self.test_xxx, device) 849 850 @param expected_result: True if the test is expected to pass; 851 False otherwise. 852 @param test_method: the test method to run. 853 854 @returns: True if the test result matches expected_result; 855 False otherwise. 856 """ 857 if expected_result: 858 # If the test is expected to pass, just run it normally. 859 return test_method(*args, **kwargs) 860 else: 861 # If the test is expected to fail, run it throuigh self.expect_fail 862 # to handle the failure. 863 return self.expect_fail(test_method, *args, **kwargs) 864 865 866 # TODO(b/131170539) remove when sarien/arcada no longer have _signed 867 # postfix 868 def get_base_platform_name(self): 869 """Returns the DUT platform name 870 871 If the DUT is a DVT device, _signed or _unsigned may be appended 872 to the device name, which we should ignore in our BT tests 873 874 @returns: String name of the DUT's platform with _signed or 875 _unsigned removed 876 """ 877 878 platform = self.host.get_platform() 879 880 return platform.replace('_signed', '').replace('_unsigned', '') 881 882 def platform_will_reconnect_on_boot(self): 883 """Indicates if we should expect DUT to automatically reconnect on boot 884 885 Some platforms do not have built-in I/O (i.e. ChromeBox) and will 886 automatically reconnect to paired HID devices on boot. 887 888 @returns: True if platform will reconnect on boot, else False 889 """ 890 891 return self.host.get_board_type() in RECONNECT_PLATFORM_TYPES 892 893 def group_btpeers_type(self): 894 """Group all Bluetooth peers by the type of their detected device.""" 895 896 # Use previously created btpeer_group instead of creating new 897 if len(self.btpeer_group_copy) > 0: 898 logging.info('Using previously created btpeer group') 899 for device_type in SUPPORTED_DEVICE_TYPES: 900 self.btpeer_group[device_type] = \ 901 self.btpeer_group_copy[device_type][:] 902 return 903 904 # Create new btpeer_group 905 for device_type in SUPPORTED_DEVICE_TYPES: 906 self.btpeer_group[device_type] = list() 907 # Create copy of btpeer_group 908 self.btpeer_group_copy[device_type] = list() 909 910 for idx, btpeer in enumerate(self.host.btpeer_list): 911 for device_type,gen_device_func in SUPPORTED_DEVICE_TYPES.items(): 912 try: 913 device = gen_device_func(btpeer)() 914 if device.CheckSerialConnection(): 915 self.btpeer_group[device_type].append(btpeer) 916 logging.debug('%d-th btpeer find device %s', \ 917 idx, device_type) 918 # Create copy of btpeer_group 919 self.btpeer_group_copy[device_type].append(btpeer) 920 except: 921 logging.debug('Error with initializing %s on %d-th' 922 'btpeer', device_type, idx) 923 if len(self.btpeer_group[device_type]) == 0: 924 logging.error('No device is detected on %d-th btpeer', idx) 925 926 logging.debug("self.bt_group is %s",self.btpeer_group) 927 928 929 def wait_for_device(self, device, timeout=10): 930 """Waits for device to become available again 931 932 We reset raspberry pi peer between tests. This method helps us wait to 933 prevent us from trying to use the device before it comes back up again. 934 935 @param device: proxy object of peripheral device 936 """ 937 938 def is_device_ready(): 939 """Tries to use a service of the device 940 941 @returns: True if device is available to provide service 942 False otherwise 943 """ 944 945 try: 946 # Call a simple (fast) function to determine if device is online 947 # and reachable. If we can query this property, we know the 948 # device is available for us to use 949 getattr(device, 'GetCapabilities')() 950 951 except Exception as e: 952 return False 953 954 return True 955 956 957 try: 958 utils.poll_for_condition(condition=is_device_ready, 959 desc='wait_for_device', 960 timeout=timeout) 961 962 except utils.TimeoutError as e: 963 raise error.TestError('Peer is not available after waiting') 964 965 966 def clear_raspi_device(self, device, next_device_type=None): 967 """Clears a device on a raspi peer by resetting bluetooth stack 968 969 @param device: proxy object of peripheral device 970 """ 971 972 try: 973 device.ResetStack(next_device_type) 974 975 except socket.error as e: 976 # Ignore conn reset, expected during stack reset 977 if e.errno != errno.ECONNRESET: 978 raise 979 980 except chameleon.ChameleonConnectionError as e: 981 # Ignore chameleon conn reset, expected during stack reset 982 if (str(errno.ECONNRESET) not in str(e) and 983 'ResetStack' not in str(e)): 984 raise 985 logging.info('Ignored exception due to ResetStack: %s', str(e)) 986 987 except six.moves.http_client.BadStatusLine as e: 988 # BadStatusLine occurs occasionally when chameleon 989 # is restarted. We ignore it here 990 logging.error('Ignoring badstatusline exception') 991 pass 992 993 # Catch generic Fault exception by rpc server, ignore 994 # method not available as it indicates platform didn't 995 # support method and that's ok 996 except Exception as e: 997 if not (e.__class__.__name__ == 'Fault' and 998 'is not supported' in str(e)): 999 raise 1000 1001 # Ensure device is back online before continuing 1002 self.wait_for_device(device, timeout=30) 1003 1004 def device_set_powered(self, device, powered): 1005 """Set raspi BT powered state. 1006 1007 @param powered: Powered state to set on Raspi. 1008 """ 1009 if powered: 1010 device.AdapterPowerOn() 1011 else: 1012 device.AdapterPowerOff() 1013 1014 def get_device_rasp(self, device_num, on_start=True): 1015 """Get all bluetooth device objects from Bluetooth peer devices 1016 This method should be called only after group_btpeers_type 1017 @param device_num : dict of {device_type:number}, to specify the number 1018 of device needed for each device_type. 1019 1020 @param on_start: boolean describing whether the requested clear is for a 1021 new test, or in the middle of a current one 1022 1023 @returns: True if Success. 1024 """ 1025 1026 logging.info("in get_device_rasp %s onstart %s", device_num, on_start) 1027 total_num_devices = sum(device_num.values()) 1028 if total_num_devices > len(self.host.btpeer_list): 1029 logging.error( 1030 'Total number of devices %s is greater than the' 1031 ' number of Bluetooth peers %s', total_num_devices, 1032 len(self.host.btpeer_list)) 1033 return False 1034 1035 for device_type, number in device_num.items(): 1036 total_num_devices += number 1037 if len(self.btpeer_group[device_type]) < number: 1038 logging.error('Number of Bluetooth peers with device type' 1039 '%s is %d, which is less then needed %d', device_type, 1040 len(self.btpeer_group[device_type]), number) 1041 return False 1042 1043 for btpeer in self.btpeer_group[device_type][:number]: 1044 logging.info("getting emulated %s", device_type) 1045 device = self.reset_btpeer(btpeer, device_type, on_start) 1046 1047 self.devices[device_type].append(device) 1048 1049 # Remove this btpeer from btpeer_group since it is already 1050 # configured as a specific device 1051 for temp_device in SUPPORTED_DEVICE_TYPES: 1052 if btpeer in self.btpeer_group[temp_device]: 1053 self.btpeer_group[temp_device].remove(btpeer) 1054 1055 return True 1056 1057 def get_peer_device_type(self, device): 1058 """Determine the type of peer a device is emulating 1059 1060 Sometimes it is useful to be able to flexibly determine what type of 1061 peripheral a device is emulating. This helper function does a reverse 1062 look-up to determine what type it was registered as. 1063 1064 @param device: the emulated peer device 1065 1066 @returns: the emulated device type if found, e.g. 'MOUSE' or 1067 'BLE_KEYBOARD', else None 1068 """ 1069 1070 for device_type, device_list in self.devices.items(): 1071 if device in device_list: 1072 return device_type 1073 1074 return None 1075 1076 def get_device(self, device_type, on_start=True): 1077 """Get the bluetooth device object. 1078 1079 @param device_type : the bluetooth device type, e.g., 'MOUSE' 1080 1081 @param on_start: boolean describing whether the requested clear is for a 1082 new test, or in the middle of a current one 1083 1084 @returns: the bluetooth device object 1085 1086 """ 1087 1088 self.devices[device_type].append( 1089 self.reset_btpeer(self.host.btpeer, device_type, on_start)) 1090 1091 return self.devices[device_type][-1] 1092 1093 1094 def reset_emulated_device(self, device, device_type, clear_device=True): 1095 """Reset the emulated device in order to be used as a different type. 1096 1097 @param device: the emulated peer device to reset with new device type 1098 @param device_type : the new bluetooth device type, e.g., 'MOUSE' 1099 @param clear_device: whether to clear the device state 1100 1101 @returns: the bluetooth device object 1102 1103 """ 1104 # Re-fresh device to clean state if test is starting 1105 if clear_device: 1106 self.clear_raspi_device(device, next_device_type=device_type) 1107 1108 try: 1109 # Tell generic chameleon to bind to this device type 1110 device.SpecifyDeviceType(device_type) 1111 1112 # Catch generic Fault exception by rpc server, ignore method not 1113 # available as it indicates platform didn't support method and that's 1114 # ok 1115 except Exception as e: 1116 if not (e.__class__.__name__ == 'Fault' and 1117 'is not supported' in str(e)): 1118 logging.error("got exception %s", str(e)) 1119 raise 1120 1121 return device 1122 1123 def reset_btpeer(self, peer, device_type, clear_device=True): 1124 """Reset the btpeer device in order to be used as a different type. 1125 1126 @param peer: the btpeer device to reset with new device type 1127 @param device_type : the new bluetooth device type, e.g., 'MOUSE' 1128 @param clear_device: whether to clear the device state 1129 1130 @returns: the bluetooth device object 1131 1132 """ 1133 device = get_bluetooth_emulated_device(peer, device_type) 1134 1135 return self.reset_emulated_device(device, device_type, clear_device) 1136 1137 def is_device_available(self, btpeer, device_type): 1138 """Determines if the named device is available on the linked peer 1139 1140 @param device_type: the bluetooth HID device type, e.g., 'MOUSE' 1141 1142 @returns: True if it is able to resolve the device, false otherwise 1143 """ 1144 1145 device = SUPPORTED_DEVICE_TYPES[device_type](btpeer)() 1146 try: 1147 # The proxy prevents us from checking if the object is None directly 1148 # so instead we call a fast method that any peripheral must support. 1149 # This will fail if the object over the proxy doesn't exist 1150 getattr(device, 'GetCapabilities')() 1151 1152 except Exception as e: 1153 return False 1154 1155 return True 1156 1157 1158 def list_devices_available(self): 1159 """Queries which devices are available on btpeer(s) 1160 1161 @returns: dict mapping HID device types to number of supporting peers 1162 available, e.g. {'MOUSE':1, 'KEYBOARD':1} 1163 """ 1164 devices_available = {} 1165 for device_type in SUPPORTED_DEVICE_TYPES: 1166 for btpeer in self.host.btpeer_list: 1167 if self.is_device_available(btpeer, device_type): 1168 devices_available[device_type] = \ 1169 devices_available.get(device_type, 0) + 1 1170 1171 logging.debug("devices available are %s", devices_available) 1172 return devices_available 1173 1174 1175 def suspend_resume(self, suspend_time=SUSPEND_TIME_SECS): 1176 """Suspend the DUT for a while and then resume. 1177 1178 @param suspend_time: the suspend time in secs 1179 @raises errors.TestFail if the device reboots during suspend 1180 1181 """ 1182 boot_id = self.host.get_boot_id() 1183 suspend = self.suspend_async(suspend_time=suspend_time) 1184 start_time = self.bluetooth_facade.get_device_utc_time() 1185 1186 # Give the system some time to enter suspend 1187 self.test_suspend_and_wait_for_sleep( 1188 suspend, sleep_timeout=self.SUSPEND_ENTER_SECS) 1189 1190 # Wait for resume - since we're not testing suspend itself, we are 1191 # lenient with the resume time here 1192 self.test_wait_for_resume(boot_id, 1193 suspend, 1194 resume_timeout=suspend_time, 1195 test_start_time=start_time) 1196 1197 1198 def reboot(self): 1199 """Reboot the DUT and recreate necessary processes and variables""" 1200 self.host.reboot() 1201 1202 # We need to recreate the bluetooth_facade after a reboot. 1203 # Delete the proxy first so it won't delete the old one, which 1204 # invokes disconnection, after creating the new one. 1205 if hasattr(self, 'factory'): 1206 del self.factory 1207 if hasattr(self, 'bluetooth_facade'): 1208 del self.bluetooth_facade 1209 if hasattr(self, 'input_facade'): 1210 del self.input_facade 1211 self.factory = remote_facade_factory.RemoteFacadeFactory( 1212 self.host, 1213 disable_arc=True, 1214 no_chrome=not self.start_browser, 1215 force_python3=True) 1216 self.bluetooth_facade = self.factory.create_bluetooth_facade( 1217 self.floss) 1218 self.input_facade = self.factory.create_input_facade() 1219 1220 # Re-enable debugging verbose since Chrome will set it to 1221 # default(disable). 1222 self.enable_disable_debug_log(enable=True) 1223 1224 # Re-disable cellular 1225 self.enable_disable_cellular(enable=False) 1226 1227 # Re-disable ui 1228 self.enable_disable_ui(enable=False) 1229 1230 self.start_new_btmon() 1231 self.start_new_usbmon(reboot=True) 1232 1233 1234 def _wait_till_condition_holds(self, func, method_name, 1235 timeout=DEFAULT_HOLD_TIMEOUT_SECS, 1236 sleep_interval=DEFAULT_HOLD_SLEEP_SECS, 1237 hold_interval=DEFAULT_HOLD_INTERVAL_SECS, 1238 start_delay=DEFAULT_START_DELAY_SECS): 1239 """ Wait for the func() to hold true for a period of time 1240 1241 1242 @param func: the function to wait for. 1243 @param method_name: the invoking class method. 1244 @param timeout: number of seconds to wait before giving up. 1245 @param sleep_interval: the interval in seconds to sleep between 1246 invoking func(). 1247 @param hold_interval: the interval in seconds for the condition to 1248 remain true 1249 @param start_delay: interval in seconds to wait before starting 1250 1251 @returns: True if the condition is met, 1252 False otherwise 1253 1254 """ 1255 if start_delay > 0: 1256 logging.debug('waiting for %s secs before checking %s',start_delay, 1257 method_name) 1258 time.sleep(start_delay) 1259 1260 try: 1261 utils.poll_till_condition_holds(condition=func, 1262 timeout=timeout, 1263 sleep_interval=sleep_interval, 1264 hold_interval = hold_interval, 1265 desc=('Waiting %s' % method_name)) 1266 return True 1267 except utils.TimeoutError as e: 1268 logging.error('%s: %s', method_name, e) 1269 except Exception as e: 1270 logging.error('%s: %s', method_name, e) 1271 err = 'bluetoothd possibly crashed. Check out /var/log/messages.' 1272 logging.error(err) 1273 except: 1274 logging.error('%s: unexpected error', method_name) 1275 return False 1276 1277 1278 def _wait_for_condition(self, func, method_name, 1279 timeout=ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS, 1280 sleep_interval=ADAPTER_POLLING_DEFAULT_SLEEP_SECS, 1281 start_delay=DEFAULT_START_DELAY_SECS): 1282 """Wait for the func() to become True. 1283 1284 @param func: the function to wait for. 1285 @param method_name: the invoking class method. 1286 @param timeout: number of seconds to wait before giving up. 1287 @param sleep_interval: the interval in seconds to sleep between 1288 invoking func(). 1289 @param start_delay: interval in seconds to wait before starting 1290 1291 @returns: True if the condition is met, 1292 False otherwise 1293 1294 """ 1295 1296 if start_delay > 0: 1297 logging.debug('waiting for %s secs before checking %s',start_delay, 1298 method_name) 1299 time.sleep(start_delay) 1300 1301 try: 1302 utils.poll_for_condition(condition=func, 1303 timeout=timeout, 1304 sleep_interval=sleep_interval, 1305 desc=('Waiting %s' % method_name)) 1306 return True 1307 except utils.TimeoutError as e: 1308 logging.error('%s: %s', method_name, e) 1309 except Exception as e: 1310 logging.error('%s: %s', method_name, e) 1311 err = 'bluetoothd possibly crashed. Check out /var/log/messages.' 1312 logging.error(err) 1313 except: 1314 logging.error('%s: unexpected error', method_name) 1315 return False 1316 1317 def ignore_failure(instance, test_method, *args, **kwargs): 1318 """ Wrapper to prevent a test_method failure from failing the test batch 1319 1320 Sometimes a test method needs to be used as a normal function, for its 1321 result. This wrapper prevent test_method failure being recorded in 1322 instance.fails and causing a failure of the quick test batch. 1323 1324 @param test_method: test_method 1325 @returns: result of the test_method 1326 """ 1327 1328 original_fails = instance.fails[:] 1329 test_result = test_method(*args, **kwargs) 1330 if not test_result: 1331 logging.info("%s failure is ignored",test_method.__name__) 1332 instance.fails = original_fails 1333 return test_result 1334 1335 1336 def start_agent(self, device): 1337 """Start the pairing agent of the device if applicable. 1338 1339 @param device: the peer device 1340 """ 1341 dev_type = device.GetDeviceType() 1342 capability = self.AGENT_CAPABILITY.get(dev_type) 1343 if capability: 1344 device.StartPairingAgent(capability) 1345 1346 1347 def stop_agent(self, device): 1348 """Stop the pairing agent of the device if applicable. 1349 1350 @param device: the peer device 1351 """ 1352 dev_type = device.GetDeviceType() 1353 capability = self.AGENT_CAPABILITY.get(dev_type) 1354 if capability: 1355 device.StopPairingAgent() 1356 1357 1358 # ------------------------------------------------------------------- 1359 # Adater standalone tests 1360 # ------------------------------------------------------------------- 1361 1362 1363 def service_exists(self, service_name): 1364 """Checks if a service exists on the DUT 1365 1366 @param service_name: name of the service 1367 1368 @returns: True if service status can be queried, else False 1369 """ 1370 1371 status_cmd = 'initctl status {}'.format(service_name) 1372 try: 1373 # Querying the status of a non-existent service throws an 1374 # AutoservRunError exception. If no exception is thrown, we know 1375 # the service exists 1376 self.host.run(status_cmd) 1377 1378 except error.AutoservRunError: 1379 return False 1380 1381 return True 1382 1383 1384 def service_enabled(self, service_name): 1385 """Checks if a service is running on the DUT 1386 1387 @param service_name: name of the service 1388 1389 @throws: AutoservRunError is thrown if there is no service with the 1390 provided name installed on the DUT. 1391 1392 @returns: True if service is currently running, else False 1393 """ 1394 1395 status_cmd = 'initctl status {}'.format(service_name) 1396 output = self.host.run(status_cmd).stdout 1397 1398 return 'start/running' in output 1399 1400 1401 def _initctl_services(self, services, command): 1402 """Use initctl to control service on the DUT 1403 1404 @param services: list of string service names 1405 @param command: initctl command on the services 1406 'start': to start the service 1407 'stop': to stop the service 1408 'restart': to restart the service 1409 1410 @returns: True if services were set successfully, else False 1411 """ 1412 for service in services: 1413 # Some platforms will not support all services. In these cases, 1414 # no need to fail, since they won't interfere with our tests 1415 if not self.service_exists(service): 1416 logging.debug('Service %s does not exist on DUT', service) 1417 continue 1418 1419 # A sample call to enable or disable a service is as follows: 1420 # "initctl stop modemfwd" 1421 if command in ['start', 'stop']: 1422 enable = command == 'start' 1423 if self.service_enabled(service) != enable: 1424 self.host.run('initctl {} {}'.format(command, service)) 1425 1426 if self.service_enabled(service) != enable: 1427 logging.error('Failed to set initctl service to state %d', 1428 enable) 1429 return False 1430 1431 if enable: 1432 logging.info('Service {} enabled'.format(service)) 1433 else: 1434 logging.info('Service {} disabled'.format(service)) 1435 1436 elif command == 'restart': 1437 if self.service_enabled(service): 1438 self.host.run('initctl {} {}'.format(command, service)) 1439 else: 1440 # Just start a stopped job. 1441 self.host.run('initctl {} {}'.format('start', service)) 1442 logging.info('Service {} restarted'.format(service)) 1443 1444 else: 1445 logging.error('unknown command {} on services {}'.format( 1446 command, services)) 1447 return False 1448 1449 return True 1450 1451 1452 def enable_disable_services(self, services, enable): 1453 """Enable or disable service on the DUT 1454 1455 @param services: list of string service names 1456 @param enable: True to enable services, False to disable 1457 1458 @returns: True if services were set successfully, else False 1459 """ 1460 command = 'start' if enable else 'stop' 1461 return self._initctl_services(services, command) 1462 1463 1464 def enable_disable_cellular(self, enable): 1465 """Enable cellular services on the DUT 1466 1467 @param enable: True to enable cellular services 1468 False to disable cellular services 1469 1470 @returns: True if services were set successfully, else False 1471 """ 1472 cellular_services = ['modemmanager', 'modemfwd'] 1473 1474 return self.enable_disable_services(cellular_services, enable) 1475 1476 1477 def enable_disable_ui(self, enable): 1478 """Enable UI service on the DUT 1479 1480 @param enable: True to enable UI services 1481 False to disable UI services 1482 1483 @returns: True if services were set successfully, else False 1484 """ 1485 ui_services = ['ui'] 1486 1487 return self.enable_disable_services(ui_services, enable) 1488 1489 1490 def restart_services(self, services): 1491 """Restart a service on the DUT 1492 1493 @param services: the services, e.g., ['cras',] 1494 1495 @returns: True if services were set successfully, else False 1496 """ 1497 return self._initctl_services(services, 'restart') 1498 1499 1500 def restart_cras(self): 1501 """Restart the cras service on the DUT 1502 1503 @returns: True if cras was restart successfully, else False 1504 """ 1505 return self.bluetooth_facade.restart_cras() 1506 1507 1508 def enable_disable_debug_log(self, enable): 1509 """Enable or disable debug log in DUT 1510 @param enable: True to enable all of the debug log, 1511 False to disable all of the debug log. 1512 """ 1513 level = int(enable) 1514 self.bluetooth_facade.set_debug_log_levels(level, level) 1515 1516 def enable_disable_quality_debug_log(self, enable): 1517 """Enable or disable bluez quality debug log in the DUT 1518 @param enable: True to enable all of the debug log, 1519 False to disable all of the debug log. 1520 """ 1521 self.bluetooth_facade.set_quality_debug_log(bool(enable)) 1522 1523 def start_new_btmon(self): 1524 """ Start a new btmon process and save the log """ 1525 1526 # Kill all btmon process before creating a new one 1527 self.host.run('pkill btmon || true') 1528 1529 # Make sure the directory exists 1530 self.host.run('mkdir -p %s' % self.BTMON_DIR_LOG_PATH) 1531 1532 # Time format. Ex, 2020_02_20_17_52_45 1533 now = time.strftime("%Y_%m_%d_%H_%M_%S") 1534 file_name = 'btsnoop_%s' % now 1535 1536 path = os.path.join(self.BTMON_DIR_LOG_PATH, file_name) 1537 self.host.run_background('btmon -SAw %s' % path) 1538 return path 1539 1540 def start_new_usbmon(self, reboot=False): 1541 """ Start a new USBMON process and save the log 1542 1543 @param reboot: True to indicate we are starting new usbmon on reboot 1544 False otherwise 1545 """ 1546 1547 # Kill all usbmon process before creating a new one 1548 self.host.run('pkill tcpdump || true') 1549 1550 # Delete usbmon logs from previous tests unless we are starting another 1551 # usbmon because of reboot. 1552 if not reboot: 1553 self.host.run('rm -f %s/*' % (self.USBMON_DIR_LOG_PATH)) 1554 1555 # Make sure the directory exists 1556 self.host.run('mkdir -p %s' % self.USBMON_DIR_LOG_PATH) 1557 1558 # Time format. Ex, 2020_02_20_17_52_45 1559 now = time.strftime("%Y_%m_%d_%H_%M_%S") 1560 file_name = 'usbmon_%s' % now 1561 self.host.run_background('tcpdump -i usbmon0 -w %s/%s -C %s -W %d' % 1562 (self.USBMON_DIR_LOG_PATH, file_name, 1563 self.USBMON_SINGLE_FILE_MAX_SIZE, 1564 self.USBMON_NUM_OF_ROTATE_FILE)) 1565 1566 1567 def log_message(self, msg): 1568 """ Write a string to log.""" 1569 self.bluetooth_facade.log_message(msg) 1570 1571 def is_wrt_supported(self): 1572 """ Check if Bluetooth adapter support WRT logs. """ 1573 return self.bluetooth_facade.is_wrt_supported() 1574 1575 def enable_wrt_logs(self): 1576 """ Enable WRT logs from Intel Adapters.""" 1577 return self.bluetooth_facade.enable_wrt_logs() 1578 1579 def collect_wrt_logs(self): 1580 """ Collect WRT logs from Intel Adapters.""" 1581 return self.bluetooth_facade.collect_wrt_logs() 1582 1583 @test_retry_and_log 1584 def test_bluetoothd_running(self): 1585 """Test that bluetoothd is running.""" 1586 return self.bluetooth_facade.is_bluetoothd_running() 1587 1588 1589 @test_retry_and_log 1590 def test_start_bluetoothd(self): 1591 """Test that bluetoothd could be started successfully.""" 1592 return self.bluetooth_facade.start_bluetoothd() 1593 1594 1595 @test_retry_and_log 1596 def test_stop_bluetoothd(self): 1597 """Test that bluetoothd could be stopped successfully.""" 1598 return self.bluetooth_facade.stop_bluetoothd() 1599 1600 1601 @test_retry_and_log 1602 def test_has_adapter(self): 1603 """Verify that there is an adapter. This will return True only if both 1604 the kernel and bluetooth daemon see the adapter. 1605 """ 1606 return self.bluetooth_facade.has_adapter() 1607 1608 @test_retry_and_log 1609 def test_adapter_work_state(self): 1610 """Test that the bluetooth adapter is in the correct working state. 1611 1612 This includes that the adapter is detectable, is powered on, 1613 and its hci device is hci0. 1614 """ 1615 has_adapter = self.bluetooth_facade.has_adapter() 1616 is_powered_on = self._wait_for_condition( 1617 self.bluetooth_facade.is_powered_on, method_name()) 1618 hci = self.bluetooth_facade.get_hci() == self.EXPECTED_HCI 1619 self.results = { 1620 'has_adapter': has_adapter, 1621 'is_powered_on': is_powered_on, 1622 'hci': hci} 1623 return all(self.results.values()) 1624 1625 @test_retry_and_log(False) 1626 def test_adapter_wake_enabled(self): 1627 """Test that the bluetooth adapter is wakeup enabled. 1628 """ 1629 wake_enabled = self._wait_for_condition( 1630 self.bluetooth_facade.is_wake_enabled, method_name(), 1631 timeout=self.ADAPTER_WAKE_ENABLE_TIMEOUT_SECS) 1632 1633 self.results = { 'wake_enabled': wake_enabled } 1634 return any(self.results.values()) 1635 1636 @test_retry_and_log(False) 1637 def test_device_wake_allowed(self, device_address): 1638 """Test that given device can wake the system.""" 1639 self.results = { 1640 'Wake allowed': 1641 self.bluetooth_facade.get_device_property( 1642 device_address, 'WakeAllowed') 1643 } 1644 1645 return all(self.results.values()) 1646 1647 @test_retry_and_log(False) 1648 def test_device_wake_not_allowed(self, device_address): 1649 """Test that given device cannot wake the system.""" 1650 self.results = { 1651 'Wake not allowed': 1652 not self.bluetooth_facade.get_device_property( 1653 device_address, 'WakeAllowed') 1654 } 1655 1656 return all(self.results.values()) 1657 1658 @test_retry_and_log(False) 1659 def test_adapter_set_wake_disabled(self): 1660 """Disable wake and verify it was written. """ 1661 success = self.bluetooth_facade.set_wake_enabled(False) 1662 self.results = { 'disable_wake': success } 1663 return all(self.results.values()) 1664 1665 @test_retry_and_log 1666 def test_power_on_adapter(self): 1667 """Test that the adapter could be powered on successfully.""" 1668 power_on = self.bluetooth_facade.set_powered(True) 1669 is_powered_on = self._wait_for_condition( 1670 self.bluetooth_facade.is_powered_on, method_name()) 1671 1672 self.results = {'power_on': power_on, 'is_powered_on': is_powered_on} 1673 return all(self.results.values()) 1674 1675 1676 @test_retry_and_log 1677 def test_power_off_adapter(self): 1678 """Test that the adapter could be powered off successfully.""" 1679 power_off = self.bluetooth_facade.set_powered(False) 1680 is_powered_off = self._wait_for_condition( 1681 lambda: not self.bluetooth_facade.is_powered_on(), 1682 method_name()) 1683 1684 self.results = { 1685 'power_off': power_off, 1686 'is_powered_off': is_powered_off} 1687 return all(self.results.values()) 1688 1689 1690 @test_retry_and_log 1691 def test_reset_on_adapter(self): 1692 """Test that the adapter could be reset on successfully. 1693 1694 This includes restarting bluetoothd, and removing the settings 1695 and cached devices. 1696 """ 1697 reset_on = self.bluetooth_facade.reset_on() 1698 is_powered_on = self._wait_for_condition( 1699 self.bluetooth_facade.is_powered_on, method_name()) 1700 1701 self.results = {'reset_on': reset_on, 'is_powered_on': is_powered_on} 1702 return all(self.results.values()) 1703 1704 1705 @test_retry_and_log 1706 def test_reset_off_adapter(self): 1707 """Test that the adapter could be reset off successfully. 1708 1709 This includes restarting bluetoothd, and removing the settings 1710 and cached devices. 1711 """ 1712 reset_off = self.bluetooth_facade.reset_off() 1713 is_powered_off = self._wait_for_condition( 1714 lambda: not self.bluetooth_facade.is_powered_on(), 1715 method_name()) 1716 1717 self.results = { 1718 'reset_off': reset_off, 1719 'is_powered_off': is_powered_off} 1720 return all(self.results.values()) 1721 1722 1723 def test_is_powered_off(self): 1724 """Check if the adapter is powered off.""" 1725 is_powered_off = not self.bluetooth_facade.is_powered_on() 1726 self.results = {'is_powered_off': is_powered_off} 1727 return all(self.results.values()) 1728 1729 1730 @test_retry_and_log(False) 1731 def test_is_facade_valid(self): 1732 """Checks whether the bluetooth facade is in a good state. 1733 1734 If bluetoothd restarts (i.e. due to a crash), the object proxies will no 1735 longer be valid (because the session will be closed). Check whether the 1736 session failed and wait for a new session if it did. 1737 """ 1738 initially_ok = self.bluetooth_facade.is_bluetoothd_valid() 1739 daemon_started = initially_ok or self.bluetooth_facade.start_bluetoothd( 1740 ) 1741 eventually_ok = initially_ok or self.bluetooth_facade.is_bluetoothd_valid( 1742 ) 1743 1744 self.results = { 1745 'initially_ok': initially_ok, 1746 'eventually_ok': eventually_ok, 1747 'daemon_started': daemon_started, 1748 } 1749 return all( 1750 [self.results[x] for x in ['eventually_ok', 'daemon_started']]) 1751 1752 1753 @test_retry_and_log(False) 1754 def test_is_adapter_valid(self): 1755 """Verify the bluetooth adapter is retrievable at test start 1756 1757 @raises: error.TestNAError if we fail to retrieve the adapter on 1758 an unsupported chipset 1759 error.TestFail if we fail to retrieve the adapter on any other 1760 platform 1761 1762 @returns: True if the adapter was located properly 1763 """ 1764 1765 if not self.bluetooth_facade.has_adapter(): 1766 logging.error('No adapter available, rebooting to recover') 1767 1768 self.reboot() 1769 1770 chipset = self.bluetooth_facade.get_chipset_name() 1771 1772 if not chipset: 1773 raise error.TestFail('Unknown adapter is missing') 1774 1775 # A missing adapter is a rare but known issue on several platforms 1776 # that have no vendor support (b/169328792). Since there is no fix 1777 # possible, we forgive these failures by raising a TestNA. 1778 if chipset in UNSUPPORTED_CHIPSETS: 1779 raise error.TestNAError('Unsupported adapter is missing') 1780 1781 raise error.TestFail('Adapter is missing') 1782 1783 return True 1784 1785 @test_retry_and_log 1786 def test_UUIDs(self): 1787 """Test that basic profiles are supported.""" 1788 adapter_UUIDs = self.bluetooth_facade.get_UUIDs() 1789 self.results = [uuid for uuid in self.SUPPORTED_UUIDS.values() 1790 if uuid not in adapter_UUIDs] 1791 return not bool(self.results) 1792 1793 1794 @test_retry_and_log 1795 def test_start_discovery(self): 1796 """Test that the adapter could start discovery.""" 1797 start_discovery, _ = self.bluetooth_facade.start_discovery() 1798 is_discovering = self._wait_for_condition( 1799 self.bluetooth_facade.is_discovering, method_name()) 1800 1801 self.results = { 1802 'start_discovery': start_discovery, 1803 'is_discovering': is_discovering} 1804 return all(self.results.values()) 1805 1806 @test_retry_and_log(False) 1807 def test_is_discovering(self): 1808 """Test that the adapter is already discovering.""" 1809 is_discovering = self._wait_for_condition( 1810 self.bluetooth_facade.is_discovering, method_name()) 1811 1812 self.results = {'is_discovering': is_discovering} 1813 return all(self.results.values()) 1814 1815 @test_retry_and_log 1816 def test_stop_discovery(self): 1817 """Test that the adapter could stop discovery.""" 1818 if not self.bluetooth_facade.is_discovering(): 1819 return True 1820 1821 stop_discovery, _ = self.bluetooth_facade.stop_discovery() 1822 is_not_discovering = self._wait_for_condition( 1823 lambda: not self.bluetooth_facade.is_discovering(), 1824 method_name()) 1825 1826 self.results = { 1827 'stop_discovery': stop_discovery, 1828 'is_not_discovering': is_not_discovering} 1829 return all(self.results.values()) 1830 1831 1832 @test_retry_and_log 1833 def test_discoverable(self): 1834 """Test that the adapter could be set discoverable.""" 1835 set_discoverable = self.bluetooth_facade.set_discoverable(True) 1836 is_discoverable = self._wait_for_condition( 1837 self.bluetooth_facade.is_discoverable, method_name()) 1838 1839 self.results = { 1840 'set_discoverable': set_discoverable, 1841 'is_discoverable': is_discoverable} 1842 return all(self.results.values()) 1843 1844 @test_retry_and_log(False) 1845 def test_is_discoverable(self): 1846 """Test that the adapter is discoverable.""" 1847 is_discoverable = self._wait_for_condition( 1848 self.bluetooth_facade.is_discoverable, method_name()) 1849 1850 self.results = {'is_discoverable': is_discoverable} 1851 return all(self.results.values()) 1852 1853 1854 def _test_timeout_property(self, set_property, check_property, set_timeout, 1855 get_timeout, property_name, 1856 timeout_values = [0, 60, 180]): 1857 """Common method to test (Discoverable/Pairable)Timeout . 1858 1859 This is used to test 1860 - DiscoverableTimeout property 1861 - PairableTimeout property 1862 1863 The test performs the following 1864 - Set PropertyTimeout 1865 - Read PropertyTimeout and make sure values match 1866 - Set adapter propety 1867 - In a loop check if property is active 1868 - Test fails property is false before timeout 1869 - Test fails property is True after timeout 1870 Repeat the test for different values for timeout 1871 1872 Note : Value of 0 mean it never timeouts, so the test will 1873 end after 30 seconds. 1874 """ 1875 def check_timeout(timeout): 1876 """Check for timeout value in loop while recording failures.""" 1877 actual_timeout = get_timeout() 1878 if timeout != actual_timeout: 1879 logging.debug('%s timeout value read %s does not ' 1880 'match value set %s, yet', property_name, 1881 actual_timeout, timeout) 1882 return False 1883 else: 1884 return True 1885 1886 def _test_timeout_property(timeout): 1887 # minium time after timeout before checking property 1888 MIN_DELTA_SECS = 3 1889 # Time between checking property 1890 WAIT_TIME_SECS = 2 1891 1892 # Set and read back the timeout value 1893 if not set_timeout(timeout): 1894 logging.error('Setting the %s timeout failed',property_name) 1895 return False 1896 1897 1898 if not self._wait_for_condition(lambda : check_timeout(timeout), 1899 'check_'+property_name): 1900 logging.error('checking %s_timeout value timed out', 1901 property_name) 1902 return False 1903 1904 # 1905 # Check that the timeout works 1906 # Check property is true until timeout 1907 # and then it is not 1908 1909 property_set = set_property(True) 1910 property_is_true = self._wait_for_condition(check_property, 1911 method_name()) 1912 1913 self.results = { 'set_%s' % property_name : property_set, 1914 'is_%s' % property_name: property_is_true} 1915 logging.debug(self.results) 1916 1917 if not all(self.results.values()): 1918 logging.error('Setting %s failed',property_name) 1919 return False 1920 1921 start_time = time.time() 1922 while True: 1923 time.sleep(WAIT_TIME_SECS) 1924 cur_time = time.time() 1925 property_set = check_property() 1926 time_elapsed = cur_time - start_time 1927 1928 # Ignore check_property results made near the timeout 1929 # to avoid spurious failures. 1930 if abs(int(timeout - time_elapsed)) < MIN_DELTA_SECS: 1931 continue 1932 1933 # Timeout of zero seconds mean that the adapter never times out 1934 # Check for 30 seconds and then exit the test. 1935 if timeout == 0: 1936 if not property_set: 1937 logging.error('Adapter is not %s after %.2f ' 1938 'secs with a timeout of zero ', 1939 property_name, time_elapsed) 1940 return False 1941 elif time_elapsed > 30: 1942 logging.debug('Adapter %s after %.2f seconds ' 1943 'with timeout of zero as expected' , 1944 property_name, time_elapsed) 1945 return True 1946 continue 1947 1948 # 1949 # Check if property is true till timeout ends and 1950 # false afterwards 1951 # 1952 if time_elapsed < timeout: 1953 if not property_set: 1954 logging.error('Adapter is not %s after %.2f ' 1955 'secs before timeout of %.2f', 1956 property_name, time_elapsed, timeout) 1957 return False 1958 else: 1959 if property_set: 1960 logging.error('Adapter is still %s after ' 1961 ' %.2f secs with timeout of %.2f', 1962 property_name, time_elapsed, timeout) 1963 return False 1964 else: 1965 logging.debug('Adapter not %s after %.2f ' 1966 'secs with timeout of %.2f as expected ', 1967 property_name, time_elapsed, timeout) 1968 return True 1969 1970 default_value = check_property() 1971 default_timeout = get_timeout() 1972 1973 result = [] 1974 try: 1975 for timeout in timeout_values: 1976 result.append(_test_timeout_property(timeout)) 1977 logging.debug("Test returning %s", all(result)) 1978 return all(result) 1979 except: 1980 logging.error("exception in test_%s_timeout",property_name) 1981 raise 1982 finally: 1983 # Set the property back to default value permanently before 1984 # exiting the test 1985 set_timeout(0) 1986 set_property(default_value) 1987 # Set the timeout back to default value before exiting the test 1988 set_timeout(default_timeout) 1989 1990 1991 @test_retry_and_log 1992 def test_discoverable_timeout(self, timeout_values = [0, 60, 180]): 1993 """Test adapter dbus property DiscoverableTimeout.""" 1994 return self._test_timeout_property( 1995 set_property = self.bluetooth_facade.set_discoverable, 1996 check_property = self.bluetooth_facade.is_discoverable, 1997 set_timeout = self.bluetooth_facade.set_discoverable_timeout, 1998 get_timeout = self.bluetooth_facade.get_discoverable_timeout, 1999 property_name = 'discoverable', 2000 timeout_values = timeout_values) 2001 2002 @test_retry_and_log 2003 def test_pairable_timeout(self, timeout_values = [0, 60, 180]): 2004 """Test adapter dbus property PairableTimeout.""" 2005 return self._test_timeout_property( 2006 set_property = self.bluetooth_facade.set_pairable, 2007 check_property = self.bluetooth_facade.is_pairable, 2008 set_timeout = self.bluetooth_facade.set_pairable_timeout, 2009 get_timeout = self.bluetooth_facade.get_pairable_timeout, 2010 property_name = 'pairable', 2011 timeout_values = timeout_values) 2012 2013 2014 @test_retry_and_log 2015 def test_nondiscoverable(self): 2016 """Test that the adapter could be set non-discoverable.""" 2017 set_nondiscoverable = self.bluetooth_facade.set_discoverable(False) 2018 is_nondiscoverable = self._wait_for_condition( 2019 lambda: not self.bluetooth_facade.is_discoverable(), 2020 method_name()) 2021 2022 self.results = { 2023 'set_nondiscoverable': set_nondiscoverable, 2024 'is_nondiscoverable': is_nondiscoverable} 2025 return all(self.results.values()) 2026 2027 2028 @test_retry_and_log 2029 def test_pairable(self): 2030 """Test that the adapter could be set pairable.""" 2031 set_pairable = self.bluetooth_facade.set_pairable(True) 2032 is_pairable = self._wait_for_condition( 2033 self.bluetooth_facade.is_pairable, method_name()) 2034 2035 self.results = { 2036 'set_pairable': set_pairable, 2037 'is_pairable': is_pairable} 2038 return all(self.results.values()) 2039 2040 2041 @test_retry_and_log 2042 def test_nonpairable(self): 2043 """Test that the adapter could be set non-pairable.""" 2044 set_nonpairable = self.bluetooth_facade.set_pairable(False) 2045 is_nonpairable = self._wait_for_condition( 2046 lambda: not self.bluetooth_facade.is_pairable(), method_name()) 2047 2048 self.results = { 2049 'set_nonpairable': set_nonpairable, 2050 'is_nonpairable': is_nonpairable} 2051 return all(self.results.values()) 2052 2053 2054 @test_retry_and_log(False) 2055 def test_check_valid_adapter_id(self): 2056 """Fail if the Bluetooth ID is not in the correct format. 2057 2058 @returns True if adapter ID follows expected format, False otherwise 2059 """ 2060 2061 device = self.get_base_platform_name() 2062 adapter_info = self.get_adapter_properties() 2063 2064 # Don't complete test if this is a reference board 2065 if device in self.REFERENCE_BOARDS: 2066 return True 2067 2068 modalias = adapter_info['Modalias'] 2069 logging.debug('Saw Bluetooth ID of: %s', modalias) 2070 2071 # Valid Device ID is: 2072 # <00E0(Google)>/<C405(ChromeOS)>/<non-zero versionNumber> 2073 bt_format = 'bluetooth:v00E0pC405d(?!0000)' 2074 2075 if not re.match(bt_format, modalias): 2076 return False 2077 2078 return True 2079 2080 2081 @test_retry_and_log(False) 2082 def test_check_valid_alias(self): 2083 """Fail if the Bluetooth alias is not in the correct format. 2084 2085 @returns True if adapter alias follows expected format, False otherwise 2086 """ 2087 2088 device = self.get_base_platform_name() 2089 adapter_info = self.get_adapter_properties() 2090 2091 # Don't complete test if this is a reference board 2092 if device in self.REFERENCE_BOARDS: 2093 return True 2094 2095 alias = adapter_info['Alias'] 2096 logging.debug('Saw Bluetooth Alias of: %s', alias) 2097 2098 device_type = self.host.get_board_type().lower() 2099 alias_format = '%s_[a-z0-9]{4}' % device_type 2100 2101 self.results = {} 2102 2103 alias_was_correct = True 2104 if not re.match(alias_format, alias.lower()): 2105 alias_was_correct = False 2106 logging.info('unexpected alias %s found', alias) 2107 self.results['alias_found'] = alias 2108 2109 self.results['alias_was_correct'] = alias_was_correct 2110 return all(self.results.values()) 2111 2112 2113 # ------------------------------------------------------------------- 2114 # Tests about general discovering, pairing, and connection 2115 # ------------------------------------------------------------------- 2116 2117 2118 @test_retry_and_log(False) 2119 def test_discover_device(self, 2120 device_address, 2121 start_discovery=True, 2122 stop_discovery=True): 2123 """Test that the adapter could discover the specified device address. 2124 2125 @param device_address: Address of the device. 2126 @param start_discovery: Whether to start discovery. Set to False if you 2127 call start_discovery before calling this. 2128 @param stop_discovery: Whether to stop discovery at the end. If this is 2129 set to False, make sure to call 2130 test_stop_discovery afterwards. 2131 2132 @returns: True if the device is found. False otherwise. 2133 2134 """ 2135 discovery_stopped = False 2136 is_not_discovering = False 2137 device_discovered = False 2138 # If start discovery is not set, discovery must already be started 2139 discovery_started = not start_discovery 2140 has_device = self.bluetooth_facade.has_device 2141 2142 if start_discovery: 2143 if has_device(device_address): 2144 # Before starting a new discovery, remove the found device since 2145 # it is likely to be a temporary device, and we don't know when 2146 # it will be removed by bluez. Therefore, remove it and re-find 2147 # the device to ensure the device object exists for the 2148 # following test, e.g. test_pairing. 2149 logging.debug('Removing device %s to restart temporary timer', 2150 device_address) 2151 self.bluetooth_facade.remove_device_object(device_address) 2152 2153 discovery_started = self.bluetooth_facade.start_discovery() 2154 2155 if discovery_started: 2156 try: 2157 utils.poll_for_condition( 2158 condition=(lambda: has_device(device_address)), 2159 timeout=self.ADAPTER_DISCOVER_TIMEOUT_SECS, 2160 sleep_interval=self. 2161 ADAPTER_DISCOVER_POLLING_SLEEP_SECS, 2162 desc='Waiting for discovering %s' % device_address) 2163 device_discovered = True 2164 except utils.TimeoutError as e: 2165 logging.error('test_discover_device: %s', e) 2166 except Exception as e: 2167 logging.error('test_discover_device: %s', e) 2168 err = ('bluetoothd probably crashed.' 2169 'Check out /var/log/messages') 2170 logging.error(err) 2171 except: 2172 logging.error('test_discover_device: unexpected error') 2173 2174 if start_discovery and stop_discovery: 2175 discovery_stopped, _ = self.bluetooth_facade.stop_discovery() 2176 is_not_discovering = self._wait_for_condition( 2177 lambda: not self.bluetooth_facade.is_discovering(), 2178 method_name()) 2179 2180 self.results = { 2181 'should_start_discovery': start_discovery, 2182 'should_stop_discovery': stop_discovery, 2183 'start_discovery': discovery_started, 2184 'stop_discovery': discovery_stopped, 2185 'is_not_discovering': is_not_discovering, 2186 'device_discovered': device_discovered} 2187 2188 # Make sure a discovered device properly started and stopped discovery 2189 device_found = device_discovered and (discovery_stopped 2190 and is_not_discovering 2191 if stop_discovery else True) 2192 2193 return device_found 2194 2195 2196 def _test_discover_by_device(self, device): 2197 return device.Discover(self.bluetooth_facade.address) 2198 2199 @test_retry_and_log(False, messages_start=False, messages_stop=False) 2200 def test_discover_by_device(self, device): 2201 """Test that the device could discover the adapter address. 2202 2203 @param device: Meta device to represent peer device. 2204 2205 @returns: True if the adapter is found by the device. 2206 """ 2207 adapter_discovered = False 2208 discover_by_device = self._test_discover_by_device 2209 discovered_initially = discover_by_device(device) 2210 2211 if not discovered_initially: 2212 try: 2213 utils.poll_for_condition( 2214 condition=(lambda: discover_by_device(device)), 2215 timeout=self.ADAPTER_DISCOVER_TIMEOUT_SECS, 2216 sleep_interval= 2217 self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS, 2218 desc='Waiting for adapter to be discovered') 2219 adapter_discovered = True 2220 except utils.TimeoutError as e: 2221 logging.error('test_discover_by_device: %s', e) 2222 except Exception as e: 2223 logging.error('test_discover_by_device: %s', e) 2224 err = ('bluetoothd probably crashed.' 2225 'Check out /var/log/messages') 2226 logging.error(err) 2227 except: 2228 logging.error('test_discover_by_device: unexpected error') 2229 2230 self.results = { 2231 'adapter_discovered_initially': discovered_initially, 2232 'adapter_discovered': adapter_discovered 2233 } 2234 return any(self.results.values()) 2235 2236 @test_retry_and_log(False, messages_start=False, messages_stop=False) 2237 def test_discover_by_device_fails(self, device): 2238 """Test that the device could not discover the adapter address. 2239 2240 @param device: Meta device to represent peer device. 2241 2242 @returns False if the adapter is found by the device. 2243 """ 2244 self.results = { 2245 'adapter_discovered': self._test_discover_by_device(device) 2246 } 2247 return not any(self.results.values()) 2248 2249 @test_retry_and_log(False, messages_start=False, messages_stop=False) 2250 def test_device_set_discoverable(self, device, discoverable): 2251 """Test that we could set the peer device to discoverable. """ 2252 try: 2253 device.SetDiscoverable(discoverable) 2254 except: 2255 return False 2256 2257 return True 2258 2259 @test_retry_and_log 2260 def test_pairing(self, device_address, pin, trusted=True): 2261 """Test that the adapter could pair with the device successfully. 2262 2263 @param device_address: Address of the device. 2264 @param pin: pin code to pair with the device. 2265 @param trusted: indicating whether to set the device trusted. 2266 2267 @returns: True if pairing succeeds. False otherwise. 2268 2269 """ 2270 2271 def _pair_device(): 2272 """Pair to the device. 2273 2274 @returns: True if it could pair with, connect to, and retrieve 2275 connection info from the device. False otherwise. 2276 2277 """ 2278 self.results['paired'] = self.bluetooth_facade.pair_legacy_device( 2279 device_address, pin, trusted, 2280 self.ADAPTER_PAIRING_TIMEOUT_SECS) 2281 self.results[ 2282 'connected'] = self.bluetooth_facade.device_is_connected( 2283 device_address) 2284 self.results[ 2285 'connection_info_retrievable'] = self.bluetooth_facade.has_connection_info( 2286 device_address) 2287 2288 return self.results['paired'] and self.results[ 2289 'connected'] and self.results['connection_info_retrievable'] 2290 2291 self.results = { 2292 'has_device': False, 2293 'paired': False, 2294 'connected': False, 2295 'connection_info_retrievable': False, 2296 'connection_num': 2297 self.bluetooth_facade.get_num_connected_devices() + 1 2298 } 2299 2300 if self.bluetooth_facade.has_device(device_address): 2301 self.results['has_device'] = True 2302 try: 2303 utils.poll_for_condition( 2304 condition=_pair_device, 2305 timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS, 2306 sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, 2307 desc='Waiting for pairing %s' % device_address) 2308 except utils.TimeoutError as e: 2309 logging.error('test_pairing: %s', e) 2310 except: 2311 logging.error('test_pairing: unexpected error') 2312 2313 return all(self.results.values()) 2314 2315 @test_retry_and_log 2316 def test_remove_pairing(self, device_address): 2317 """Test that the adapter could remove the paired device. 2318 2319 @param device_address: Address of the device. 2320 2321 @returns: True if the device is removed successfully. False otherwise. 2322 2323 """ 2324 device_is_paired_initially = self.bluetooth_facade.device_is_paired( 2325 device_address) 2326 remove_pairing = False 2327 pairing_removed = False 2328 2329 if device_is_paired_initially: 2330 remove_pairing = self.bluetooth_facade.remove_device_object( 2331 device_address) 2332 pairing_removed = not self.bluetooth_facade.device_is_paired( 2333 device_address) 2334 2335 self.results = { 2336 'device_is_paired_initially': device_is_paired_initially, 2337 'remove_pairing': remove_pairing, 2338 'pairing_removed': pairing_removed} 2339 return all(self.results.values()) 2340 2341 2342 def test_set_trusted(self, device_address, trusted=True): 2343 """Test whether the device with the specified address is trusted. 2344 2345 @param device_address: Address of the device. 2346 @param trusted : True or False indicating if trusted is expected. 2347 2348 @returns: True if the device's "Trusted" property is as specified; 2349 False otherwise. 2350 2351 """ 2352 2353 set_trusted = self.bluetooth_facade.set_trusted( 2354 device_address, trusted) 2355 2356 actual_trusted = self.bluetooth_facade.get_device_property( 2357 device_address, 'Trusted') 2358 2359 self.results = { 2360 'set_trusted': set_trusted, 2361 'actual trusted': actual_trusted, 2362 'expected trusted': trusted} 2363 return actual_trusted == trusted 2364 2365 2366 @test_retry_and_log 2367 def test_connection_by_adapter(self, device_address): 2368 """Test that the adapter of dut could connect to the device successfully 2369 2370 It is the caller's responsibility to pair to the device before 2371 doing connection. 2372 2373 @param device_address: Address of the device. 2374 2375 @returns: True if connection is performed. False otherwise. 2376 2377 """ 2378 2379 def _connect_device(): 2380 """Connect to the device. 2381 2382 @returns: True if it could connect to the device. False otherwise. 2383 2384 """ 2385 return self.bluetooth_facade.connect_device(device_address) 2386 2387 2388 has_device = False 2389 connected = False 2390 if self.bluetooth_facade.has_device(device_address): 2391 has_device = True 2392 try: 2393 utils.poll_for_condition( 2394 condition=_connect_device, 2395 timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS, 2396 sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, 2397 desc='Waiting for connecting to %s' % device_address) 2398 connected = True 2399 except utils.TimeoutError as e: 2400 logging.error('test_connection_by_adapter: %s', e) 2401 except: 2402 logging.error('test_connection_by_adapter: unexpected error') 2403 2404 self.results = {'has_device': has_device, 'connected': connected} 2405 return all(self.results.values()) 2406 2407 2408 @test_retry_and_log 2409 def test_disconnection_by_adapter(self, device_address): 2410 """Test that the adapter of dut could disconnect the device successfully 2411 2412 @param device_address: Address of the device. 2413 2414 @returns: True if disconnection is performed. False otherwise. 2415 2416 """ 2417 return self.bluetooth_facade.disconnect_device(device_address) 2418 2419 2420 def _enter_command_mode(self, device): 2421 """Let the device enter command mode. 2422 2423 Before using the device, need to call this method to make sure 2424 it is in the command mode. 2425 2426 @param device: the bluetooth HID device 2427 2428 @returns: True if successful. False otherwise. 2429 2430 """ 2431 result = _is_successful(_run_method(device.EnterCommandMode, 2432 'EnterCommandMode')) 2433 if not result: 2434 logging.error('EnterCommandMode failed') 2435 return result 2436 2437 2438 @test_retry_and_log 2439 def test_connection_by_device( 2440 self, device, post_connection_delay=ADAPTER_HID_INPUT_DELAY): 2441 """Test that the device could connect to the adapter successfully. 2442 2443 This emulates the behavior that a device may initiate a 2444 connection request after waking up from power saving mode. 2445 2446 @param device: the bluetooth HID device 2447 @param post_connection_delay: the delay introduced post connection to 2448 allow profile functionality to be ready 2449 2450 @returns: True if connection is performed correctly by device and 2451 the adapter also enters connection state. 2452 False otherwise. 2453 2454 """ 2455 if not self._enter_command_mode(device): 2456 return False 2457 2458 method_name = 'test_connection_by_device' 2459 connection_by_device = False 2460 adapter_address = self.bluetooth_facade.address 2461 try: 2462 connection_by_device = device.ConnectToRemoteAddress( 2463 adapter_address) 2464 except Exception as e: 2465 logging.error('%s (device): %s', method_name, e) 2466 except: 2467 logging.error('%s (device): unexpected error', method_name) 2468 2469 connection_seen_by_adapter = False 2470 device_address = device.address 2471 device_is_connected = self.bluetooth_facade.device_is_connected 2472 try: 2473 utils.poll_for_condition( 2474 condition=lambda: device_is_connected(device_address), 2475 timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS, 2476 desc=('Waiting for connection from %s' % device_address)) 2477 connection_seen_by_adapter = True 2478 2479 # Although the connect may be complete, it can take a few 2480 # seconds for the input device to be ready for use 2481 time.sleep(post_connection_delay) 2482 except utils.TimeoutError as e: 2483 logging.error('%s (adapter): %s', method_name, e) 2484 except: 2485 logging.error('%s (adapter): unexpected error', method_name) 2486 2487 self.results = { 2488 'connection_by_device': connection_by_device, 2489 'connection_seen_by_adapter': connection_seen_by_adapter} 2490 return all(self.results.values()) 2491 2492 @test_retry_and_log(True, messages_start=False, messages_stop=False) 2493 def test_connection_by_device_only(self, device, adapter_address): 2494 """Test that the device could connect to adapter successfully. 2495 2496 This is a modified version of test_connection_by_device that only 2497 communicates with the peer device and not the host (in case the host is 2498 suspended for example). 2499 2500 @param device: the bluetooth peer device 2501 @param adapter_address: address of the adapter 2502 2503 @returns: True if the connection was established by the device or False. 2504 """ 2505 connected = device.ConnectToRemoteAddress(adapter_address) 2506 if connected: 2507 # Although the connect may be complete, it can take a few 2508 # seconds for the input device to be ready for use 2509 time.sleep(self.ADAPTER_HID_INPUT_DELAY) 2510 2511 self.results = { 2512 'connection_by_device': connected 2513 } 2514 2515 return all(self.results.values()) 2516 2517 2518 @test_retry_and_log 2519 def test_disconnection_by_device(self, device): 2520 """Test that the device could disconnect the adapter successfully. 2521 2522 This emulates the behavior that a device may initiate a 2523 disconnection request before going into power saving mode. 2524 2525 Note: should not try to enter command mode in this method. When 2526 a device is connected, there is no way to enter command mode. 2527 One could just issue a special disconnect command without 2528 entering command mode. 2529 2530 @param device: the bluetooth HID device 2531 2532 @returns: True if disconnection is performed correctly by device and 2533 the adapter also observes the disconnection. 2534 False otherwise. 2535 2536 """ 2537 # TODO(b/182864322) - remove the following statement when the bug 2538 # is fixed. 2539 device.SetRemoteAddress(self.bluetooth_facade.address) 2540 2541 method_name = 'test_disconnection_by_device' 2542 disconnection_by_device = False 2543 try: 2544 device.Disconnect() 2545 disconnection_by_device = True 2546 except Exception as e: 2547 logging.error('%s (device): %s', method_name, e) 2548 except: 2549 logging.error('%s (device): unexpected error', method_name) 2550 2551 disconnection_seen_by_adapter = False 2552 device_address = device.address 2553 device_is_connected = self.bluetooth_facade.device_is_connected 2554 try: 2555 utils.poll_for_condition( 2556 condition=lambda: not device_is_connected(device_address), 2557 timeout=self.ADAPTER_DISCONNECTION_TIMEOUT_SECS, 2558 desc=('Waiting for disconnection from %s' % device_address)) 2559 disconnection_seen_by_adapter = True 2560 except utils.TimeoutError as e: 2561 logging.error('%s (adapter): %s', method_name, e) 2562 except: 2563 logging.error('%s (adapter): unexpected error', method_name) 2564 2565 self.results = { 2566 'disconnection_by_device': disconnection_by_device, 2567 'disconnection_seen_by_adapter': disconnection_seen_by_adapter} 2568 return all(self.results.values()) 2569 2570 2571 @test_retry_and_log(False) 2572 def test_device_is_connected( 2573 self, 2574 device_address, 2575 timeout=ADAPTER_CONNECTION_TIMEOUT_SECS, 2576 sleep_interval=ADAPTER_PAIRING_POLLING_SLEEP_SECS): 2577 """Test that device address given is currently connected. 2578 2579 @param device_address: Address of the device. 2580 @param timeout: maximum number of seconds to wait 2581 @param sleep_interval: time to sleep between polls 2582 2583 @returns: True if the device is connected. 2584 False otherwise. 2585 """ 2586 2587 def _is_connected(): 2588 """Test if device is connected. 2589 2590 @returns: True if device is connected. False otherwise. 2591 2592 """ 2593 return self.bluetooth_facade.device_is_connected(device_address) 2594 2595 method_name = 'test_device_is_connected' 2596 has_device = False 2597 connected = False 2598 if self.bluetooth_facade.has_device(device_address): 2599 has_device = True 2600 try: 2601 utils.poll_for_condition( 2602 condition=_is_connected, 2603 timeout=timeout, 2604 sleep_interval=sleep_interval, 2605 desc='Waiting to check connection to %s' % 2606 device_address) 2607 connected = True 2608 except utils.TimeoutError as e: 2609 logging.error('%s: %s', method_name, e) 2610 except: 2611 logging.error('%s: unexpected error', method_name) 2612 self.results = {'has_device': has_device, 'connected': connected} 2613 return all(self.results.values()) 2614 2615 2616 @test_retry_and_log(False) 2617 def test_device_is_not_connected(self, device_address): 2618 """Test that device address given is NOT currently connected. 2619 2620 @param device_address: Address of the device. 2621 2622 @returns: True if the device is NOT connected. 2623 False otherwise. 2624 2625 """ 2626 2627 def _is_not_connected(): 2628 """Test if device is not connected. 2629 2630 @returns: True if device is not connected. False otherwise. 2631 2632 """ 2633 return not self.bluetooth_facade.device_is_connected( 2634 device_address) 2635 2636 2637 method_name = 'test_device_is_not_connected' 2638 not_connected = False 2639 if self.bluetooth_facade.has_device(device_address): 2640 try: 2641 utils.poll_for_condition( 2642 condition=_is_not_connected, 2643 timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS, 2644 sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, 2645 desc='Waiting to check connection to %s' % 2646 device_address) 2647 not_connected = True 2648 except utils.TimeoutError as e: 2649 logging.error('%s: %s', method_name, e) 2650 except: 2651 logging.error('%s: unexpected error', method_name) 2652 raise 2653 else: 2654 not_connected = True 2655 self.results = {'not_connected': not_connected} 2656 return all(self.results.values()) 2657 2658 2659 @test_retry_and_log 2660 def test_device_is_paired(self, device_address): 2661 """Test that the device address given is currently paired. 2662 2663 @param device_address: Address of the device. 2664 2665 @returns: True if the device is paired. 2666 False otherwise. 2667 2668 """ 2669 def _is_paired(): 2670 """Test if device is paired. 2671 2672 @returns: True if device is paired. False otherwise. 2673 2674 """ 2675 return self.bluetooth_facade.device_is_paired(device_address) 2676 2677 2678 method_name = 'test_device_is_paired' 2679 has_device = False 2680 paired = False 2681 if self.bluetooth_facade.has_device(device_address): 2682 has_device = True 2683 try: 2684 utils.poll_for_condition( 2685 condition=_is_paired, 2686 timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS, 2687 sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, 2688 desc='Waiting for connection to %s' % device_address) 2689 paired = True 2690 except utils.TimeoutError as e: 2691 logging.error('%s: %s', method_name, e) 2692 except: 2693 logging.error('%s: unexpected error', method_name) 2694 self.results = {'has_device': has_device, 'paired': paired} 2695 return all(self.results.values()) 2696 2697 2698 def _get_device_name(self, device_address): 2699 """Get the device name. 2700 2701 @returns: True if the device name is derived. None otherwise. 2702 2703 """ 2704 2705 self.discovered_device_name = self.bluetooth_facade.get_device_property( 2706 device_address, 'Name') 2707 2708 return bool(self.discovered_device_name) 2709 2710 2711 @test_retry_and_log 2712 def test_device_name(self, device_address, expected_device_name): 2713 """Test that the device name discovered by the adapter is correct. 2714 2715 @param device_address: Address of the device. 2716 @param expected_device_name: the bluetooth device name 2717 2718 @returns: True if the discovered_device_name is expected_device_name. 2719 False otherwise. 2720 2721 """ 2722 try: 2723 utils.poll_for_condition( 2724 condition=lambda: self._get_device_name(device_address), 2725 timeout=self.ADAPTER_DISCOVER_NAME_TIMEOUT_SECS, 2726 sleep_interval=self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS, 2727 desc='Waiting for device name of %s' % device_address) 2728 except utils.TimeoutError as e: 2729 logging.error('test_device_name: %s', e) 2730 except: 2731 logging.error('test_device_name: unexpected error') 2732 2733 self.results = { 2734 'expected_device_name': expected_device_name, 2735 'discovered_device_name': self.discovered_device_name} 2736 return self.discovered_device_name == expected_device_name 2737 2738 2739 @test_retry_and_log 2740 def test_device_class_of_service(self, device_address, 2741 expected_class_of_service): 2742 """Test that the discovered device class of service is as expected. 2743 2744 @param device_address: Address of the device. 2745 @param expected_class_of_service: the expected class of service 2746 2747 @returns: True if the discovered class of service matches the 2748 expected class of service. False otherwise. 2749 2750 """ 2751 2752 device_class = self.bluetooth_facade.get_device_property(device_address, 2753 'Class') 2754 discovered_class_of_service = (device_class & self.CLASS_OF_SERVICE_MASK 2755 if device_class else None) 2756 2757 self.results = { 2758 'device_class': device_class, 2759 'expected_class_of_service': expected_class_of_service, 2760 'discovered_class_of_service': discovered_class_of_service} 2761 return discovered_class_of_service == expected_class_of_service 2762 2763 2764 @test_retry_and_log 2765 def test_device_class_of_device(self, device_address, 2766 expected_class_of_device): 2767 """Test that the discovered device class of device is as expected. 2768 2769 @param device_address: Address of the device. 2770 @param expected_class_of_device: the expected class of device 2771 2772 @returns: True if the discovered class of device matches the 2773 expected class of device. False otherwise. 2774 2775 """ 2776 2777 device_class = self.bluetooth_facade.get_device_property(device_address, 2778 'Class') 2779 discovered_class_of_device = (device_class & self.CLASS_OF_DEVICE_MASK 2780 if device_class else None) 2781 2782 self.results = { 2783 'device_class': device_class, 2784 'expected_class_of_device': expected_class_of_device, 2785 'discovered_class_of_device': discovered_class_of_device} 2786 return discovered_class_of_device == expected_class_of_device 2787 2788 2789 def _get_btmon_log(self, method, logging_timespan=1): 2790 """Capture the btmon log when executing the specified method. 2791 2792 @param method: the method to capture log. 2793 The method would be executed only when it is not None. 2794 This allows us to simply capture btmon log without 2795 executing any command. 2796 @param logging_timespan: capture btmon log for logging_timespan seconds. 2797 2798 """ 2799 self.bluetooth_le_facade.btmon_start() 2800 self.advertising_msg = method() if method else '' 2801 time.sleep(logging_timespan) 2802 self.bluetooth_le_facade.btmon_stop() 2803 2804 2805 def convert_to_adv_jiffies(self, adv_interval_ms): 2806 """Convert adv interval in ms to jiffies, i.e., multiples of 0.625 ms. 2807 2808 @param adv_interval_ms: an advertising interval 2809 2810 @returns: the equivalent jiffies 2811 2812 """ 2813 return int(round(adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT)) 2814 2815 2816 def compute_duration(self, max_adv_interval_ms): 2817 """Compute duration from max_adv_interval_ms. 2818 2819 Advertising duration is calculated approximately as 2820 duration = max_adv_interval_ms / 1000.0 * 1.1 2821 2822 @param max_adv_interval_ms: max advertising interval in milliseconds. 2823 2824 @returns: duration in seconds. 2825 2826 """ 2827 return max_adv_interval_ms / 1000.0 * 1.1 2828 2829 2830 def compute_logging_timespan(self, max_adv_interval_ms): 2831 """Compute the logging timespan from max_adv_interval_ms. 2832 2833 The logging timespan is the time needed to record btmon log. 2834 2835 @param max_adv_interval_ms: max advertising interval in milliseconds. 2836 2837 @returns: logging_timespan in seconds. 2838 2839 """ 2840 duration = self.compute_duration(max_adv_interval_ms) 2841 logging_timespan = max(self.count_advertisements * duration, 1) 2842 return logging_timespan 2843 2844 2845 @test_retry_and_log(False) 2846 def test_check_duration_and_intervals(self, min_adv_interval_ms, 2847 max_adv_interval_ms, 2848 number_advertisements): 2849 """Verify that every advertisements are scheduled according to the 2850 duration and intervals. 2851 2852 An advertisement would be scheduled at the time span of 2853 duration * number_advertisements 2854 2855 @param min_adv_interval_ms: min advertising interval in milliseconds. 2856 @param max_adv_interval_ms: max advertising interval in milliseconds. 2857 @param number_advertisements: the number of existing advertisements 2858 2859 @returns: True if all advertisements are scheduled based on the 2860 duration and intervals. 2861 2862 """ 2863 2864 2865 def within_tolerance(expected, actual, max_error=0.1): 2866 """Determine if the percent error is within specified tolerance. 2867 2868 @param expected: The expected value. 2869 @param actual: The actual (measured) value. 2870 @param max_error: The maximum percent error acceptable. 2871 2872 @returns: True if the percent error is less than or equal to 2873 max_error. 2874 """ 2875 return abs(expected - actual) / abs(expected) <= max_error 2876 2877 2878 start_str = 'Set Advertising Intervals:' 2879 search_strings = ['HCI Command: LE Set Advertising Data', 'Company'] 2880 search_str = '|'.join(search_strings) 2881 2882 contents = self.bluetooth_le_facade.btmon_get(search_str=search_str, 2883 start_str=start_str) 2884 2885 # Company string looks like 2886 # Company: not assigned (65283) 2887 company_pattern = re.compile('Company:.*\((\d*)\)') 2888 2889 # The string with timestamp looks like 2890 # < HCI Command: LE Set Advertising Data (0x08|0x0008) [hci0] 3.799236 2891 set_adv_time_str = 'LE Set Advertising Data.*\[hci\d\].*(\d+\.\d+)' 2892 set_adv_time_pattern = re.compile(set_adv_time_str) 2893 2894 adv_timestamps = {} 2895 timestamp = None 2896 manufacturer_id = None 2897 for line in contents: 2898 result = set_adv_time_pattern.search(line) 2899 if result: 2900 timestamp = float(result.group(1)) 2901 2902 result = company_pattern.search(line) 2903 if result: 2904 manufacturer_id = '0x%04x' % int(result.group(1)) 2905 2906 if timestamp and manufacturer_id: 2907 if manufacturer_id not in adv_timestamps: 2908 adv_timestamps[manufacturer_id] = [] 2909 adv_timestamps[manufacturer_id].append(timestamp) 2910 timestamp = None 2911 manufacturer_id = None 2912 2913 duration = self.compute_duration(max_adv_interval_ms) 2914 expected_timespan = duration * number_advertisements 2915 2916 check_duration = True 2917 for manufacturer_id, values in six.iteritems(adv_timestamps): 2918 logging.debug('manufacturer_id %s: %s', manufacturer_id, values) 2919 timespans = [values[i] - values[i - 1] 2920 for i in range(1, len(values))] 2921 errors = [timespans[i] for i in range(len(timespans)) 2922 if not within_tolerance(expected_timespan, timespans[i])] 2923 logging.debug('timespans: %s', timespans) 2924 logging.debug('errors: %s', errors) 2925 if bool(errors): 2926 check_duration = False 2927 2928 # Verify that the advertising intervals are also correct. 2929 min_adv_interval_ms_found, max_adv_interval_ms_found = ( 2930 self._verify_advertising_intervals(min_adv_interval_ms, 2931 max_adv_interval_ms)) 2932 2933 self.results = { 2934 'check_duration': check_duration, 2935 'min_adv_interval_ms_found': min_adv_interval_ms_found, 2936 'max_adv_interval_ms_found': max_adv_interval_ms_found, 2937 } 2938 return all(self.results.values()) 2939 2940 2941 def _get_min_max_intervals_strings(self, min_adv_interval_ms, 2942 max_adv_interval_ms): 2943 """Get the min and max advertising intervals strings shown in btmon. 2944 2945 Advertising intervals shown in the btmon log look like 2946 Min advertising interval: 1280.000 msec (0x0800) 2947 Max advertising interval: 1280.000 msec (0x0800) 2948 2949 @param min_adv_interval_ms: min advertising interval in milliseconds. 2950 @param max_adv_interval_ms: max advertising interval in milliseconds. 2951 2952 @returns: the min and max intervals strings. 2953 2954 """ 2955 min_str = ('Min advertising interval: %.3f msec (0x%04x)' % 2956 (min_adv_interval_ms, 2957 self.convert_to_adv_jiffies(min_adv_interval_ms))) 2958 logging.debug('min_adv_interval_ms: %s', min_str) 2959 2960 max_str = ('Max advertising interval: %.3f msec (0x%04x)' % 2961 (max_adv_interval_ms, 2962 self.convert_to_adv_jiffies(max_adv_interval_ms))) 2963 logging.debug('max_adv_interval_ms: %s', max_str) 2964 2965 return (min_str, max_str) 2966 2967 2968 def _verify_advertising_intervals(self, min_adv_interval_ms, 2969 max_adv_interval_ms): 2970 """Verify min and max advertising intervals. 2971 2972 Advertising intervals look like 2973 Min advertising interval: 1280.000 msec (0x0800) 2974 Max advertising interval: 1280.000 msec (0x0800) 2975 2976 @param min_adv_interval_ms: min advertising interval in milliseconds. 2977 @param max_adv_interval_ms: max advertising interval in milliseconds. 2978 2979 @returns: a tuple of (True, True) if both min and max advertising 2980 intervals could be found. Otherwise, the corresponding element 2981 in the tuple if False. 2982 2983 """ 2984 min_str, max_str = self._get_min_max_intervals_strings( 2985 min_adv_interval_ms, max_adv_interval_ms) 2986 2987 min_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(min_str) 2988 max_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(max_str) 2989 2990 return min_adv_interval_ms_found, max_adv_interval_ms_found 2991 2992 2993 def _verify_scan_response_data(self, adv_data): 2994 """Verify advertisement's scan response data is correct 2995 2996 Unlike the other fixed advertising fields, Scan Response Data is set 2997 in a tag-value data format. This function helps verify the data format 2998 for specific tag values to ensure scan response was propagated correctly 2999 3000 @param adv_data: Dictionary defining advertising fields to be registered 3001 with bluetoothd daemon's RegisterAdvertisement interface 3002 3003 @returns: True if all Registered Scan Response tags were located in 3004 btmon trace, False otherwise 3005 """ 3006 3007 scan_rsp = adv_data.get('ScanResponseData') 3008 if not scan_rsp: 3009 return True 3010 3011 for tag, data in scan_rsp.items(): 3012 # Validate 16 bit Service Data tag 3013 if int(tag, 16) == 0x16: 3014 # First two bytes of data are endian-corrected UUID, followed 3015 # by service data 3016 uuid = '%x%x' % (data[1], data[0]) 3017 data_str = ''.join( 3018 ['%02x' % data[i] for i in range(2, len(data))]) 3019 3020 # Service data has the following format in btmon trace: 3021 # Service Data (UUID 0xfef3): 01020304 3022 search_str = 'Service Data (UUID 0x{}): {}'.format( 3023 uuid, data_str) 3024 3025 # Fail if data can't be located in btmon trace 3026 if not self.bluetooth_le_facade.btmon_find(search_str): 3027 return False 3028 3029 return True 3030 3031 def test_advertising_flags(self, flag_strs=[]): 3032 """Verify that advertising flags are set in registered advertisement 3033 3034 Each flag has a specific descriptor that appears in btmon trace. This 3035 simple checker validates that the desired flag descriptors appear in 3036 btmon trace when the advertisement was registered. 3037 3038 @param flag_strs: Flag string descriptors expected in btmon trace 3039 3040 #returns: True if all flag descriptors were located, False otherwise 3041 """ 3042 3043 for flag_str in flag_strs: 3044 if not self.bluetooth_le_facade.btmon_find(flag_str): 3045 logging.info( 3046 'Flag descriptor not located: {}'.format(flag_str)) 3047 return False 3048 3049 return True 3050 3051 def ext_adv_enabled(self): 3052 """ Check if platform supports extended advertising 3053 3054 @returns True if extended advertising is supported, else False 3055 """ 3056 3057 adv_features = self.bluetooth_facade.get_advertising_manager_property( 3058 'SupportedFeatures') 3059 3060 return 'HardwareOffload' in adv_features 3061 3062 def _verify_adv_tx_power(self, advertising_data): 3063 """ Verify that advertisement uses Tx Power correctly via the following: 3064 3065 1. Confirm the correct Tx Power is propagated in both MGMT and 3066 HCI commands. 3067 2. Validate that the Tx Power selected by the controller is 3068 returned to the client via dbus. 3069 3070 @param: advertising_data: dictionary of advertising data properties 3071 used to register the advertisement 3072 3073 @returns: True if the above requirements are met, False otherwise 3074 """ 3075 3076 # If we aren't using TxPower in this advertisement, success 3077 if not self.ext_adv_enabled() or 'TxPower' not in advertising_data: 3078 return True 3079 3080 # Make sure the correct Tx power was passed in both MGMT and HCI 3081 # commands by searching for two instances of search string 3082 search_str = 'TX power: {} dbm'.format(advertising_data['TxPower']) 3083 contents = self.bluetooth_le_facade.btmon_get(search_str=search_str, 3084 start_str='') 3085 if len(contents) < 2: 3086 logging.error('Could not locate correct Tx power in MGMT and HCI') 3087 return False 3088 3089 # Locate tx power selected by controller 3090 search_str = 'TX power \(selected\)' 3091 contents = self.bluetooth_le_facade.btmon_get(search_str=search_str, 3092 start_str='') 3093 3094 if not contents: 3095 logging.error('No Tx Power selected event found, failing') 3096 return False 3097 3098 # The line we want has the following structure: 3099 # 'TX power (selected): -5 dbm (0x07)' 3100 # We locate the number before 'dbm' 3101 items = contents[0].split(' ') 3102 selected_tx_power = int(items[items.index('dbm') - 1]) 3103 3104 # Validate that client's advertisement was updated correctly. 3105 new_tx_prop = self.bluetooth_le_facade.get_advertisement_property( 3106 advertising_data['Path'], 'TxPower') 3107 3108 return new_tx_prop == selected_tx_power 3109 3110 @test_retry_and_log(False) 3111 def test_register_advertisement(self, advertisement_data, instance_id): 3112 """Verify that an advertisement is registered correctly. 3113 3114 This test verifies the following data: 3115 - advertisement added 3116 - manufacturer data 3117 - service UUIDs 3118 - service data 3119 - advertising intervals 3120 - advertising enabled 3121 - Tx power set (if extended advertising available) 3122 3123 @param advertisement_data: the data of an advertisement to register. 3124 @param instance_id: the instance id which starts at 1. 3125 3126 @returns: True if the advertisement is registered correctly. 3127 False otherwise. 3128 3129 """ 3130 3131 # We need to know the intervals used to verify later. If advertisement 3132 # structure contains it, use them. Otherwise, use bluez's defaults 3133 if set(advertisement_data) >= {'MinInterval', 'MaxInterval'}: 3134 min_adv_interval_ms = advertisement_data['MinInterval'] 3135 max_adv_interval_ms = advertisement_data['MaxInterval'] 3136 3137 else: 3138 min_adv_interval_ms = self.DEFAULT_MIN_ADVERTISEMENT_INTERVAL_MS 3139 max_adv_interval_ms = self.DEFAULT_MAX_ADVERTISEMENT_INTERVAL_MS 3140 3141 # When registering a new advertisement, it is possible that another 3142 # instance is advertising. It may need to wait for all other 3143 # advertisements to complete advertising once. 3144 self.count_advertisements += 1 3145 logging_timespan = self.compute_logging_timespan(max_adv_interval_ms) 3146 self._get_btmon_log( 3147 lambda: self.bluetooth_le_facade.register_advertisement( 3148 advertisement_data), 3149 logging_timespan=logging_timespan) 3150 3151 # _get_btmon_log will store the return value of the registration request 3152 # in self.advertising_msg. If the request was successful, the return 3153 # value was an empty string 3154 registration_succeeded = (self.advertising_msg == '') 3155 3156 # Verify that a new advertisement is added. 3157 advertisement_added = ( 3158 self.bluetooth_le_facade.btmon_find('Advertising Added') and 3159 self.bluetooth_le_facade.btmon_find('Instance: %d' % 3160 instance_id)) 3161 3162 # Verify that the manufacturer data could be found. 3163 manufacturer_data = advertisement_data.get('ManufacturerData', '') 3164 manufacturer_data_found = True 3165 for manufacturer_id in manufacturer_data: 3166 # The 'not assigned' text below means the manufacturer id 3167 # is not actually assigned to any real manufacturer. 3168 # For real 16-bit manufacturer UUIDs, refer to 3169 # https://www.bluetooth.com/specifications/assigned-numbers/16-bit-UUIDs-for-Members 3170 manufacturer_data_found = self.bluetooth_le_facade.btmon_find( 3171 'Company: not assigned (%d)' % int(manufacturer_id, 16)) 3172 3173 # Verify that all service UUIDs could be found. 3174 service_uuids_found = True 3175 for uuid in advertisement_data.get('ServiceUUIDs', []): 3176 # Service UUIDs looks like ['0x180D', '0x180F'] 3177 # Heart Rate (0x180D) 3178 # Battery Service (0x180F) 3179 # For actual 16-bit service UUIDs, refer to 3180 # https://www.bluetooth.com/specifications/gatt/services 3181 if not self.bluetooth_le_facade.btmon_find('0x%s' % uuid): 3182 service_uuids_found = False 3183 break 3184 3185 # Verify service data. 3186 service_data_found = True 3187 for uuid, data in advertisement_data.get('ServiceData', {}).items(): 3188 # A service data looks like 3189 # Service Data (UUID 0x9999): 0001020304 3190 # while uuid is '9999' and data is [0x00, 0x01, 0x02, 0x03, 0x04] 3191 data_str = ''.join(['%02x' % n for n in data]) 3192 if not self.bluetooth_le_facade.btmon_find( 3193 'Service Data (UUID 0x%s): %s' % (uuid, data_str)): 3194 service_data_found = False 3195 break 3196 3197 # Broadcast advertisements are overwritten in some kernel versions to 3198 # be more aggressive. Verify that the advertising intervals are correct 3199 # if this mode is not used 3200 if advertisement_data.get('Type') != 'broadcast': 3201 min_adv_interval_ms_found, max_adv_interval_ms_found = ( 3202 self._verify_advertising_intervals(min_adv_interval_ms, 3203 max_adv_interval_ms)) 3204 3205 else: 3206 min_adv_interval_ms_found = True 3207 max_adv_interval_ms_found = True 3208 3209 scan_rsp_correct = self._verify_scan_response_data(advertisement_data) 3210 3211 # Verify advertising is enabled. 3212 advertising_enabled = self.bluetooth_le_facade.btmon_find( 3213 'Advertising: Enabled (0x01)') 3214 3215 # Verify new APIs were used 3216 new_apis_used = self.bluetooth_le_facade.btmon_find( 3217 'Add Extended Advertising Parameters') 3218 3219 tx_power_correct = self._verify_adv_tx_power(advertisement_data) 3220 3221 self.results = { 3222 'registration_succeeded': registration_succeeded, 3223 'advertisement_added': advertisement_added, 3224 'manufacturer_data_found': manufacturer_data_found, 3225 'service_uuids_found': service_uuids_found, 3226 'service_data_found': service_data_found, 3227 'min_adv_interval_ms_found': min_adv_interval_ms_found, 3228 'max_adv_interval_ms_found': max_adv_interval_ms_found, 3229 'scan_rsp_correct': scan_rsp_correct, 3230 'advertising_enabled': advertising_enabled, 3231 'new_apis_used': new_apis_used, 3232 'tx_power_correct': tx_power_correct, 3233 } 3234 return all(self.results.values()) 3235 3236 3237 @test_retry_and_log(False) 3238 def test_fail_to_register_advertisement(self, advertisement_data, 3239 min_adv_interval_ms, 3240 max_adv_interval_ms): 3241 """Verify that failure is incurred when max advertisements are reached. 3242 3243 This test verifies that a registration failure is incurred when 3244 max advertisements are reached. The error message looks like: 3245 3246 org.bluez.Error.Failed: Maximum advertisements reached 3247 3248 @param advertisement_data: the advertisement to register. 3249 @param min_adv_interval_ms: min_adv_interval in milliseconds. 3250 @param max_adv_interval_ms: max_adv_interval in milliseconds. 3251 3252 @returns: True if the error message is received correctly. 3253 False otherwise. 3254 3255 """ 3256 logging_timespan = self.compute_logging_timespan(max_adv_interval_ms) 3257 self._get_btmon_log( 3258 lambda: self.bluetooth_le_facade.register_advertisement( 3259 advertisement_data), 3260 logging_timespan=logging_timespan) 3261 3262 # Verify that it failed to register advertisement due to the fact 3263 # that max advertisements are reached. 3264 failed_to_register_error = (self.ERROR_FAILED_TO_REGISTER_ADVERTISEMENT 3265 in self.advertising_msg) 3266 3267 # Verify that no new advertisement is added. 3268 advertisement_not_added = not self.bluetooth_le_facade.btmon_find( 3269 'Advertising Added:') 3270 3271 self.results = { 3272 'failed_to_register_error': failed_to_register_error, 3273 'advertisement_not_added': advertisement_not_added, 3274 } 3275 3276 # If the registration fails and extended advertising is available, 3277 # there will be no events in btmon. Therefore, we only run this part of 3278 # the test if extended advertising is not available, indicating that 3279 # software advertisement rotation is being used. 3280 if not self.ext_adv_enabled(): 3281 # Verify that the advertising intervals are correct. 3282 min_adv_interval_ms_found, max_adv_interval_ms_found = ( 3283 self._verify_advertising_intervals(min_adv_interval_ms, 3284 max_adv_interval_ms)) 3285 3286 # Verify advertising remains enabled. 3287 advertising_enabled = self.bluetooth_le_facade.btmon_find( 3288 'Advertising: Enabled (0x01)') 3289 3290 self.results.update({ 3291 'min_adv_interval_ms_found': min_adv_interval_ms_found, 3292 'max_adv_interval_ms_found': max_adv_interval_ms_found, 3293 'advertising_enabled': advertising_enabled, 3294 }) 3295 3296 return all(self.results.values()) 3297 3298 3299 @test_retry_and_log(False) 3300 def test_unregister_advertisement(self, advertisement_data, instance_id, 3301 advertising_disabled): 3302 """Verify that an advertisement is unregistered correctly. 3303 3304 This test verifies the following data: 3305 - advertisement removed 3306 - advertising status: enabled if there are advertisements left; 3307 disabled otherwise. 3308 3309 @param advertisement_data: the data of an advertisement to unregister. 3310 @param instance_id: the instance id of the advertisement to remove. 3311 @param advertising_disabled: is advertising disabled? This happens 3312 only when all advertisements are removed. 3313 3314 @returns: True if the advertisement is unregistered correctly. 3315 False otherwise. 3316 3317 """ 3318 self.count_advertisements -= 1 3319 self._get_btmon_log( 3320 lambda: self.bluetooth_le_facade.unregister_advertisement( 3321 advertisement_data)) 3322 3323 # Verify that the advertisement is removed. 3324 advertisement_removed = ( 3325 self.bluetooth_le_facade.btmon_find('Advertising Removed') and 3326 self.bluetooth_le_facade.btmon_find('Instance: %d' % 3327 instance_id)) 3328 3329 # If advertising_disabled is True, there should be no log like 3330 # 'Advertising: Enabled (0x01)' 3331 # If advertising_disabled is False, there should be log like 3332 # 'Advertising: Enabled (0x01)' 3333 3334 # Only need to check advertising status when the last advertisement 3335 # is removed. For any other advertisements prior to the last one, 3336 # we may or may not observe 'Advertising: Enabled (0x01)' message. 3337 # Hence, the test would become flaky if we insist to see that message. 3338 # A possible workaround is to sleep for a while and then check the 3339 # message. The drawback is that we may need to wait up to 10 seconds 3340 # if the advertising duration and intervals are long. 3341 # In a test case, we always run test_check_duration_and_intervals() 3342 # to check if advertising duration and intervals are correct after 3343 # un-registering one or all advertisements, it is safe to do so. 3344 advertising_enabled_found = self.bluetooth_le_facade.btmon_find( 3345 'Advertising: Enabled (0x01)') 3346 advertising_disabled_found = self.bluetooth_le_facade.btmon_find( 3347 'Advertising: Disabled (0x00)') 3348 advertising_status_correct = not advertising_disabled or ( 3349 advertising_disabled_found and not advertising_enabled_found) 3350 3351 self.results = { 3352 'advertisement_removed': advertisement_removed, 3353 'advertising_status_correct': advertising_status_correct, 3354 } 3355 return all(self.results.values()) 3356 3357 3358 @test_retry_and_log(False) 3359 def test_set_advertising_intervals(self, min_adv_interval_ms, 3360 max_adv_interval_ms): 3361 """Verify that new advertising intervals are set correctly. 3362 3363 Note that setting advertising intervals does not enable/disable 3364 advertising. Hence, there is no need to check the advertising 3365 status. 3366 3367 @param min_adv_interval_ms: the min advertising interval in ms. 3368 @param max_adv_interval_ms: the max advertising interval in ms. 3369 3370 @returns: True if the new advertising intervals are correct. 3371 False otherwise. 3372 3373 """ 3374 self._get_btmon_log( 3375 lambda: self.bluetooth_le_facade.set_advertising_intervals( 3376 min_adv_interval_ms, max_adv_interval_ms)) 3377 3378 # Verify the new advertising intervals. 3379 # With intervals of 200 ms and 200 ms, the log looks like 3380 # bluetoothd: Set Advertising Intervals: 0x0140, 0x0140 3381 txt = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x' 3382 adv_intervals_found = self.bluetooth_le_facade.btmon_find( 3383 txt % (self.convert_to_adv_jiffies(min_adv_interval_ms), 3384 self.convert_to_adv_jiffies(max_adv_interval_ms))) 3385 3386 self.results = {'adv_intervals_found': adv_intervals_found} 3387 return all(self.results.values()) 3388 3389 3390 @test_retry_and_log(False) 3391 def test_fail_to_set_advertising_intervals( 3392 self, invalid_min_adv_interval_ms, invalid_max_adv_interval_ms, 3393 orig_min_adv_interval_ms, orig_max_adv_interval_ms): 3394 """Verify that setting invalid advertising intervals results in error. 3395 3396 If invalid min/max advertising intervals are given, it would incur 3397 the error: 'org.bluez.Error.InvalidArguments: Invalid arguments'. 3398 Note that valid advertising intervals fall between 20 ms and 10,240 ms. 3399 3400 @param invalid_min_adv_interval_ms: the invalid min advertising interval 3401 in ms. 3402 @param invalid_max_adv_interval_ms: the invalid max advertising interval 3403 in ms. 3404 @param orig_min_adv_interval_ms: the original min advertising interval 3405 in ms. 3406 @param orig_max_adv_interval_ms: the original max advertising interval 3407 in ms. 3408 3409 @returns: True if it fails to set invalid advertising intervals. 3410 False otherwise. 3411 3412 """ 3413 self._get_btmon_log( 3414 lambda: self.bluetooth_le_facade.set_advertising_intervals( 3415 invalid_min_adv_interval_ms, 3416 invalid_max_adv_interval_ms)) 3417 3418 # Verify that the invalid error is observed in the dbus error callback 3419 # message. 3420 invalid_intervals_error = (self.ERROR_INVALID_ADVERTISING_INTERVALS in 3421 self.advertising_msg) 3422 3423 # Verify that the min/max advertising intervals remain the same 3424 # after setting the invalid advertising intervals. 3425 # 3426 # In btmon log, we would see the following message first. 3427 # bluetoothd: Set Advertising Intervals: 0x0010, 0x0010 3428 # And then, we should check if "Min advertising interval" and 3429 # "Max advertising interval" remain the same. 3430 start_str = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x' % ( 3431 self.convert_to_adv_jiffies(invalid_min_adv_interval_ms), 3432 self.convert_to_adv_jiffies(invalid_max_adv_interval_ms)) 3433 3434 search_strings = ['Min advertising interval:', 3435 'Max advertising interval:'] 3436 search_str = '|'.join(search_strings) 3437 3438 contents = self.bluetooth_le_facade.btmon_get(search_str=search_str, 3439 start_str=start_str) 3440 3441 # The min/max advertising intervals of all advertisements should remain 3442 # the same as the previous valid ones. 3443 min_max_str = '[Min|Max] advertising interval: (\d*\.\d*) msec' 3444 min_max_pattern = re.compile(min_max_str) 3445 correct_orig_min_adv_interval = True 3446 correct_orig_max_adv_interval = True 3447 for line in contents: 3448 result = min_max_pattern.search(line) 3449 if result: 3450 interval = float(result.group(1)) 3451 if 'Min' in line and interval != orig_min_adv_interval_ms: 3452 correct_orig_min_adv_interval = False 3453 elif 'Max' in line and interval != orig_max_adv_interval_ms: 3454 correct_orig_max_adv_interval = False 3455 3456 self.results = { 3457 'invalid_intervals_error': invalid_intervals_error, 3458 'correct_orig_min_adv_interval': correct_orig_min_adv_interval, 3459 'correct_orig_max_adv_interval': correct_orig_max_adv_interval} 3460 3461 return all(self.results.values()) 3462 3463 3464 @test_retry_and_log(False) 3465 def test_check_advertising_intervals(self, min_adv_interval_ms, 3466 max_adv_interval_ms): 3467 """Verify that the advertising intervals are as expected. 3468 3469 @param min_adv_interval_ms: the min advertising interval in ms. 3470 @param max_adv_interval_ms: the max advertising interval in ms. 3471 3472 @returns: True if the advertising intervals are correct. 3473 False otherwise. 3474 3475 """ 3476 self._get_btmon_log(None) 3477 3478 # Verify that the advertising intervals are correct. 3479 min_adv_interval_ms_found, max_adv_interval_ms_found = ( 3480 self._verify_advertising_intervals(min_adv_interval_ms, 3481 max_adv_interval_ms)) 3482 3483 self.results = { 3484 'min_adv_interval_ms_found': min_adv_interval_ms_found, 3485 'max_adv_interval_ms_found': max_adv_interval_ms_found, 3486 } 3487 return all(self.results.values()) 3488 3489 3490 @test_retry_and_log(False) 3491 def test_reset_advertising(self, instance_ids=[]): 3492 """Verify that advertising is reset correctly. 3493 3494 Note that reset advertising would set advertising intervals to 3495 the default values. However, we would not be able to observe 3496 the values change until new advertisements are registered. 3497 Therefore, it is required that a test_register_advertisement() 3498 test is conducted after this test. 3499 3500 If instance_ids is [], all advertisements would still be removed 3501 if there are any. However, no need to check 'Advertising Removed' 3502 in btmon log since we may or may not be able to observe the message. 3503 This feature is needed if this test is invoked as the first one in 3504 a test case to reset advertising. In this situation, this test does 3505 not know how many advertisements exist. 3506 3507 @param instance_ids: the list of instance IDs that should be removed. 3508 3509 @returns: True if advertising is reset correctly. 3510 False otherwise. 3511 3512 """ 3513 self.count_advertisements = 0 3514 self._get_btmon_log( 3515 lambda: self.bluetooth_le_facade.reset_advertising()) 3516 3517 # Verify that every advertisement is removed. When an advertisement 3518 # with instance id 1 is removed, the log looks like 3519 # Advertising Removed 3520 # instance: 1 3521 if len(instance_ids) > 0: 3522 advertisement_removed = self.bluetooth_le_facade.btmon_find( 3523 'Advertising Removed') 3524 if advertisement_removed: 3525 for instance_id in instance_ids: 3526 txt = 'Instance: %d' % instance_id 3527 if not self.bluetooth_le_facade.btmon_find(txt): 3528 advertisement_removed = False 3529 break 3530 else: 3531 advertisement_removed = True 3532 3533 if not advertisement_removed: 3534 logging.error('Failed to remove advertisement') 3535 3536 # Verify the advertising is disabled. 3537 advertising_disabled_observied = self.bluetooth_le_facade.btmon_find( 3538 'Advertising: Disabled') 3539 # If there are no existing advertisements, we may not observe 3540 # 'Advertising: Disabled'. 3541 advertising_disabled = (instance_ids == [] or 3542 advertising_disabled_observied) 3543 3544 self.results = { 3545 'advertisement_removed': advertisement_removed, 3546 'advertising_disabled': advertising_disabled, 3547 } 3548 return all(self.results.values()) 3549 3550 3551 @test_retry_and_log(False) 3552 def test_receive_advertisement(self, address=None, UUID=None, timeout=10): 3553 """Verifies that we receive an advertisement with specific contents 3554 3555 Since test_discover_device only uses the existence of a device dbus path 3556 to indicate when a device is discovered, it is not adequate if we want 3557 to verify that we have received an advertisement from a device. This 3558 test monitors btmon around a discovery instance and searches for the 3559 relevant advertising report. 3560 3561 @param address: String address of peer 3562 @param UUID: String of hex data 3563 @param timeout: seconds to listen for traffic 3564 3565 @returns True if report was located, otherwise False 3566 """ 3567 3568 def _discover_devices(): 3569 self.test_start_discovery() 3570 time.sleep(timeout) 3571 self.test_stop_discovery() 3572 3573 # Run discovery, record btmon log 3574 self._get_btmon_log(_discover_devices) 3575 3576 # Grab all logs received 3577 btmon_log = '\n'.join(self.bluetooth_le_facade.btmon_get('', '')) 3578 3579 desired_strs = [] 3580 3581 if address is not None: 3582 desired_strs.append('Address: {}'.format(address)) 3583 3584 if UUID is not None: 3585 desired_strs.append('({})'.format(UUID)) 3586 3587 # Split btmon events by HCI and MGMT delimiters 3588 event_delimiter = '|'.join(['@ MGMT', '> HCI', '< HCI']) 3589 btmon_events = re.split(event_delimiter, btmon_log) 3590 3591 features_located = False 3592 3593 for event_str in btmon_events: 3594 if 'Advertising Report' not in event_str: 3595 continue 3596 3597 for desired_str in desired_strs: 3598 if desired_str not in event_str: 3599 break 3600 3601 else: 3602 features_located = True 3603 3604 self.results = { 3605 'features_located': features_located, 3606 } 3607 return all(self.results.values()) 3608 3609 3610 def add_device(self, address, address_type, action): 3611 """Add a device to the Kernel action list.""" 3612 return self.bluetooth_facade.add_device(address, address_type, action) 3613 3614 3615 def remove_device(self, address, address_type): 3616 """Remove a device from the Kernel action list.""" 3617 return self.bluetooth_facade.remove_device(address,address_type) 3618 3619 3620 def read_supported_commands(self): 3621 """Read the set of supported commands from the Kernel.""" 3622 return self.bluetooth_facade.read_supported_commands() 3623 3624 3625 def read_info(self): 3626 """Read the adapter information from the Kernel.""" 3627 return self.bluetooth_facade.read_info() 3628 3629 3630 def get_adapter_properties(self): 3631 """Read the adapter properties from the Bluetooth Daemon.""" 3632 return self.bluetooth_facade.get_adapter_properties() 3633 3634 3635 def get_dev_info(self): 3636 """Read raw HCI device information.""" 3637 return self.bluetooth_facade.get_dev_info() 3638 3639 def log_settings(self, msg, settings): 3640 """function convert MGMT_OP_READ_INFO settings to string 3641 3642 @param msg: string to include in output 3643 @param settings: bitstring returned by MGMT_OP_READ_INFO 3644 @return : List of strings indicating different settings 3645 """ 3646 strs = [] 3647 if settings & bluetooth_socket.MGMT_SETTING_POWERED: 3648 strs.append("POWERED") 3649 if settings & bluetooth_socket.MGMT_SETTING_CONNECTABLE: 3650 strs.append("CONNECTABLE") 3651 if settings & bluetooth_socket.MGMT_SETTING_FAST_CONNECTABLE: 3652 strs.append("FAST-CONNECTABLE") 3653 if settings & bluetooth_socket.MGMT_SETTING_DISCOVERABLE: 3654 strs.append("DISCOVERABLE") 3655 if settings & bluetooth_socket.MGMT_SETTING_PAIRABLE: 3656 strs.append("PAIRABLE") 3657 if settings & bluetooth_socket.MGMT_SETTING_LINK_SECURITY: 3658 strs.append("LINK-SECURITY") 3659 if settings & bluetooth_socket.MGMT_SETTING_SSP: 3660 strs.append("SSP") 3661 if settings & bluetooth_socket.MGMT_SETTING_BREDR: 3662 strs.append("BR/EDR") 3663 if settings & bluetooth_socket.MGMT_SETTING_HS: 3664 strs.append("HS") 3665 if settings & bluetooth_socket.MGMT_SETTING_LE: 3666 strs.append("LE") 3667 logging.debug('%s : %s', msg, " ".join(strs)) 3668 return strs 3669 3670 def log_flags(self, msg, flags): 3671 """Function to convert HCI state configuration to a string 3672 3673 @param msg: string to include in output 3674 @param settings: bitstring returned by get_dev_info 3675 @return : List of strings indicating different flags 3676 """ 3677 strs = [] 3678 if flags & bluetooth_socket.HCI_UP: 3679 strs.append("UP") 3680 else: 3681 strs.append("DOWN") 3682 if flags & bluetooth_socket.HCI_INIT: 3683 strs.append("INIT") 3684 if flags & bluetooth_socket.HCI_RUNNING: 3685 strs.append("RUNNING") 3686 if flags & bluetooth_socket.HCI_PSCAN: 3687 strs.append("PSCAN") 3688 if flags & bluetooth_socket.HCI_ISCAN: 3689 strs.append("ISCAN") 3690 if flags & bluetooth_socket.HCI_AUTH: 3691 strs.append("AUTH") 3692 if flags & bluetooth_socket.HCI_ENCRYPT: 3693 strs.append("ENCRYPT") 3694 if flags & bluetooth_socket.HCI_INQUIRY: 3695 strs.append("INQUIRY") 3696 if flags & bluetooth_socket.HCI_RAW: 3697 strs.append("RAW") 3698 logging.debug('%s [HCI]: %s', msg, " ".join(strs)) 3699 return strs 3700 3701 3702 @test_retry_and_log(False) 3703 def test_service_resolved(self, address): 3704 """Test that the services under device address can be resolved 3705 3706 @param address: MAC address of a device 3707 3708 @returns: True if the ServicesResolved property is changed before 3709 timeout, False otherwise. 3710 3711 """ 3712 is_resolved_func = self.bluetooth_facade.device_services_resolved 3713 return self._wait_for_condition(lambda : is_resolved_func(address),\ 3714 method_name()) 3715 3716 3717 @test_retry_and_log(False) 3718 def test_gatt_browse(self, address): 3719 """Test that the GATT client can get the attributes correctly 3720 3721 @param address: MAC address of a device 3722 3723 @returns: True if the attribute map received by GATT client is the same 3724 as expected. False otherwise. 3725 3726 """ 3727 3728 gatt_client_facade = GATT_ClientFacade(self.bluetooth_facade) 3729 actual_app = gatt_client_facade.browse(address) 3730 expected_app = GATT_HIDApplication() 3731 diff = GATT_Application.diff(actual_app, expected_app) 3732 3733 self.result = { 3734 'actural_result': actual_app, 3735 'expected_result': expected_app 3736 } 3737 3738 gatt_attribute_hierarchy = ['Device', 'Service', 'Characteristic', 3739 'Descriptor'] 3740 # Remove any difference in object path 3741 for parent, child in zip(gatt_attribute_hierarchy, 3742 gatt_attribute_hierarchy[1:]): 3743 pattern = re.compile('^%s .* is different in %s' % (child, parent)) 3744 for diff_str in diff[::]: 3745 if pattern.search(diff_str): 3746 diff.remove(diff_str) 3747 3748 if len(diff) != 0: 3749 logging.error('Application Diff: %s', diff) 3750 return False 3751 return True 3752 3753 3754 def _record_input_events(self, device, gesture, address=None): 3755 """Record the input events. 3756 3757 @param device: the bluetooth HID device. 3758 @param gesture: the gesture method to perform. 3759 3760 @returns: the input events received on the DUT. 3761 3762 """ 3763 self.input_facade.initialize_input_recorder(device.name, uniq=address) 3764 self.input_facade.start_input_recorder(device.name) 3765 time.sleep(self.HID_REPORT_SLEEP_SECS) 3766 gesture() 3767 time.sleep(self.HID_REPORT_SLEEP_SECS) 3768 self.input_facade.stop_input_recorder(device.name) 3769 time.sleep(self.HID_REPORT_SLEEP_SECS) 3770 event_values = self.input_facade.get_input_events(device.name) 3771 events = [Event(*ev) for ev in event_values] 3772 return events 3773 3774 3775 # ------------------------------------------------------------------- 3776 # Bluetooth mouse related tests 3777 # ------------------------------------------------------------------- 3778 3779 3780 def _test_mouse_click(self, device, button): 3781 """Test that the mouse click events could be received correctly. 3782 3783 @param device: the meta device containing a bluetooth HID device 3784 @param button: which button to test, 'LEFT' or 'RIGHT' 3785 3786 @returns: True if the report received by the host matches the 3787 expected one. False otherwise. 3788 3789 """ 3790 if button == 'LEFT': 3791 gesture = device.LeftClick 3792 elif button == 'RIGHT': 3793 gesture = device.RightClick 3794 else: 3795 raise error.TestError('Button (%s) is not valid.' % button) 3796 3797 actual_events = self._record_input_events(device, 3798 gesture, 3799 address=device.address) 3800 3801 linux_input_button = {'LEFT': BTN_LEFT, 'RIGHT': BTN_RIGHT} 3802 expected_events = [ 3803 # Button down 3804 recorder.MSC_SCAN_BTN_EVENT[button], 3805 Event(EV_KEY, linux_input_button[button], 1), 3806 recorder.SYN_EVENT, 3807 # Button up 3808 recorder.MSC_SCAN_BTN_EVENT[button], 3809 Event(EV_KEY, linux_input_button[button], 0), 3810 recorder.SYN_EVENT] 3811 3812 self.results = { 3813 'actual_events': list(map(str, actual_events)), 3814 'expected_events': list(map(str, expected_events))} 3815 return actual_events == expected_events 3816 3817 3818 @test_retry_and_log 3819 def test_mouse_left_click(self, device): 3820 """Test that the mouse left click events could be received correctly. 3821 3822 @param device: the meta device containing a bluetooth HID device 3823 3824 @returns: True if the report received by the host matches the 3825 expected one. False otherwise. 3826 3827 """ 3828 return self._test_mouse_click(device, 'LEFT') 3829 3830 3831 @test_retry_and_log 3832 def test_mouse_right_click(self, device): 3833 """Test that the mouse right click events could be received correctly. 3834 3835 @param device: the meta device containing a bluetooth HID device 3836 3837 @returns: True if the report received by the host matches the 3838 expected one. False otherwise. 3839 3840 """ 3841 return self._test_mouse_click(device, 'RIGHT') 3842 3843 3844 def _test_mouse_move(self, device, delta_x=0, delta_y=0): 3845 """Test that the mouse move events could be received correctly. 3846 3847 @param device: the meta device containing a bluetooth HID device 3848 @param delta_x: the distance to move cursor in x axis 3849 @param delta_y: the distance to move cursor in y axis 3850 3851 @returns: True if the report received by the host matches the 3852 expected one. False otherwise. 3853 3854 """ 3855 gesture = lambda: device.Move(delta_x, delta_y) 3856 actual_events = self._record_input_events(device, 3857 gesture, 3858 address=device.address) 3859 3860 events_x = [Event(EV_REL, REL_X, delta_x)] if delta_x else [] 3861 events_y = [Event(EV_REL, REL_Y, delta_y)] if delta_y else [] 3862 expected_events = events_x + events_y + [recorder.SYN_EVENT] 3863 3864 self.results = { 3865 'actual_events': list(map(str, actual_events)), 3866 'expected_events': list(map(str, expected_events))} 3867 return actual_events == expected_events 3868 3869 3870 @test_retry_and_log 3871 def test_mouse_move_in_x(self, device, delta_x): 3872 """Test that the mouse move events in x could be received correctly. 3873 3874 @param device: the meta device containing a bluetooth HID device 3875 @param delta_x: the distance to move cursor in x axis 3876 3877 @returns: True if the report received by the host matches the 3878 expected one. False otherwise. 3879 3880 """ 3881 return self._test_mouse_move(device, delta_x=delta_x) 3882 3883 3884 @test_retry_and_log 3885 def test_mouse_move_in_y(self, device, delta_y): 3886 """Test that the mouse move events in y could be received correctly. 3887 3888 @param device: the meta device containing a bluetooth HID device 3889 @param delta_y: the distance to move cursor in y axis 3890 3891 @returns: True if the report received by the host matches the 3892 expected one. False otherwise. 3893 3894 """ 3895 return self._test_mouse_move(device, delta_y=delta_y) 3896 3897 3898 @test_retry_and_log 3899 def test_mouse_move_in_xy(self, device, delta_x=-60, delta_y=100): 3900 """Test that the mouse move events could be received correctly. 3901 3902 @param device: the meta device containing a bluetooth HID device 3903 @param delta_x: the distance to move cursor in x axis 3904 @param delta_y: the distance to move cursor in y axis 3905 3906 @returns: True if the report received by the host matches the 3907 expected one. False otherwise. 3908 3909 """ 3910 return self._test_mouse_move(device, delta_x=delta_x, delta_y=delta_y) 3911 3912 3913 def _test_mouse_scroll(self, device, units): 3914 """Test that the mouse wheel events could be received correctly. 3915 3916 @param device: the meta device containing a bluetooth HID device 3917 @param units: the units to scroll in y axis 3918 3919 @returns: True if the report received by the host matches the 3920 expected one. False otherwise. 3921 3922 """ 3923 gesture = lambda: device.Scroll(units) 3924 recorded_events = self._record_input_events(device, 3925 gesture, 3926 address=device.address) 3927 3928 # Since high-speed scrolling events are inserted after they are passed 3929 # through bluetooth module, we ignore these events since they are 3930 # irrelevant for us 3931 scroll_events = [ev for ev in recorded_events 3932 if ev.code != REL_WHEEL_HI_RES] 3933 3934 expected_events = [Event(EV_REL, REL_WHEEL, units), recorder.SYN_EVENT] 3935 self.results = { 3936 'scroll_events': list(map(str, scroll_events)), 3937 'expected_events': list(map(str, expected_events))} 3938 return scroll_events == expected_events 3939 3940 3941 @test_retry_and_log 3942 def test_mouse_scroll_down(self, device, delta_y): 3943 """Test that the mouse wheel events could be received correctly. 3944 3945 @param device: the meta device containing a bluetooth HID device 3946 @param delta_y: the units to scroll down in y axis; 3947 should be a postive value 3948 3949 @returns: True if the report received by the host matches the 3950 expected one. False otherwise. 3951 3952 """ 3953 if delta_y > 0: 3954 return self._test_mouse_scroll(device, delta_y) 3955 else: 3956 raise error.TestError('delta_y (%d) should be a positive value', 3957 delta_y) 3958 3959 3960 @test_retry_and_log 3961 def test_mouse_scroll_up(self, device, delta_y): 3962 """Test that the mouse wheel events could be received correctly. 3963 3964 @param device: the meta device containing a bluetooth HID device 3965 @param delta_y: the units to scroll up in y axis; 3966 should be a postive value 3967 3968 @returns: True if the report received by the host matches the 3969 expected one. False otherwise. 3970 3971 """ 3972 if delta_y > 0: 3973 return self._test_mouse_scroll(device, -delta_y) 3974 else: 3975 raise error.TestError('delta_y (%d) should be a positive value', 3976 delta_y) 3977 3978 3979 @test_retry_and_log 3980 def test_mouse_click_and_drag(self, device, delta_x, delta_y): 3981 """Test that the mouse click-and-drag events could be received 3982 correctly. 3983 3984 @param device: the meta device containing a bluetooth HID device 3985 @param delta_x: the distance to drag in x axis 3986 @param delta_y: the distance to drag in y axis 3987 3988 @returns: True if the report received by the host matches the 3989 expected one. False otherwise. 3990 3991 """ 3992 gesture = lambda: device.ClickAndDrag(delta_x, delta_y) 3993 actual_events = self._record_input_events(device, 3994 gesture, 3995 address=device.address) 3996 3997 button = 'LEFT' 3998 expected_events = ( 3999 [# Button down 4000 recorder.MSC_SCAN_BTN_EVENT[button], 4001 Event(EV_KEY, BTN_LEFT, 1), 4002 recorder.SYN_EVENT] + 4003 # cursor movement in x and y 4004 ([Event(EV_REL, REL_X, delta_x)] if delta_x else []) + 4005 ([Event(EV_REL, REL_Y, delta_y)] if delta_y else []) + 4006 [recorder.SYN_EVENT] + 4007 # Button up 4008 [recorder.MSC_SCAN_BTN_EVENT[button], 4009 Event(EV_KEY, BTN_LEFT, 0), 4010 recorder.SYN_EVENT]) 4011 4012 self.results = { 4013 'actual_events': list(map(str, actual_events)), 4014 'expected_events': list(map(str, expected_events))} 4015 return actual_events == expected_events 4016 4017 4018 # ------------------------------------------------------------------- 4019 # Bluetooth keyboard related tests 4020 # ------------------------------------------------------------------- 4021 4022 @test_retry_and_log 4023 def test_keyboard_input_from_trace(self, device, trace_name): 4024 """ Tests that keyboard events can be transmitted and received correctly 4025 4026 @param device: the meta device containing a bluetooth HID device 4027 @param trace_name: string name for keyboard activity trace to be used 4028 in the test i.e. "simple_text" 4029 4030 @returns: true if the recorded output matches the expected output 4031 false otherwise 4032 """ 4033 length_correct = True 4034 content_correct = True 4035 4036 # Read data from trace I/O files 4037 input_trace = bluetooth_test_utils.parse_trace_file(os.path.join( 4038 TRACE_LOCATION, '{}_input.txt'.format(trace_name))) 4039 output_trace = bluetooth_test_utils.parse_trace_file(os.path.join( 4040 TRACE_LOCATION, '{}_output.txt'.format(trace_name))) 4041 4042 if not input_trace or not output_trace: 4043 logging.error('Failure in using trace') 4044 return False 4045 4046 # Disregard timing data for now 4047 input_scan_codes = [tup[1] for tup in input_trace] 4048 predicted_events = [Event(*tup[1]) for tup in output_trace] 4049 4050 # Create and run this trace as a gesture 4051 gesture = lambda: device.KeyboardSendTrace(input_scan_codes) 4052 rec_events = self._record_input_events(device, 4053 gesture, 4054 address=device.address) 4055 4056 # Filter out any input events that were not from the keyboard 4057 rec_key_events = [ev for ev in rec_events if ev.type == EV_KEY] 4058 4059 # Fail if we didn't record the correct number of events 4060 if len(rec_key_events) != len(input_scan_codes): 4061 logging.info('Expected {} events, received {}'.format( 4062 len(input_scan_codes), len(rec_key_events))) 4063 length_correct = False 4064 4065 for idx, predicted in enumerate(predicted_events): 4066 recorded = rec_key_events[idx] 4067 4068 if not predicted == recorded: 4069 content_correct = False 4070 break 4071 4072 self.results = { 4073 'received_events': len(rec_key_events) > 0, 4074 'length_correct': length_correct, 4075 'content_correct': content_correct, 4076 } 4077 4078 return all(self.results) 4079 4080 4081 def is_newer_kernel_version(self, version, minimum_version): 4082 """ Check if given kernel version is newer than unsupported version.""" 4083 4084 return utils.compare_versions(version, minimum_version) >= 0 4085 4086 4087 def is_supported_kernel_version(self, kernel_version, minimum_version, 4088 msg=None): 4089 """ Check if kernel version is greater than minimum version. 4090 4091 Check if given kernel version is greater than or equal to minimum 4092 version. Raise TEST_NA if given kernel version is lower than the 4093 minimum version. 4094 4095 Note: Kernel version may have suffixes, so ensure that minimum 4096 version should be the smallest version that is permissible. 4097 Ex: If minimum version is 3.8.11 then 3.8.11-<random> will 4098 pass the check. 4099 4100 @param kernel_version: kernel version to be checked as a string 4101 @param: minimum_version: minimum kernel version requried 4102 4103 @returns: None 4104 4105 @raises: TEST_NA if kernel version is not greater than the minimum 4106 version 4107 """ 4108 4109 logging.debug('kernel version is {} minimum version' 4110 'is {}'.format(kernel_version,minimum_version)) 4111 4112 if msg is None: 4113 msg = 'Test not supported on this kernel version' 4114 4115 if not self.is_newer_kernel_version(kernel_version, minimum_version): 4116 logging.info('Kernel version check failed: %s', msg) 4117 raise error.TestNAError(msg) 4118 4119 logging.debug('Kernel version check passed') 4120 4121 4122 # ------------------------------------------------------------------- 4123 # Bluetooth AVRCP related test 4124 # ------------------------------------------------------------------- 4125 4126 4127 @test_retry_and_log 4128 def test_avrcp_event(self, device, generator, avrcp_event): 4129 """Tests that AVRCP events can be transmitted and received correctly 4130 4131 @param device: the meta device containing a Bluetooth AVRCP capable 4132 audio device. 4133 @param generator: the peer device generator/function which trigger 4134 the AVRCP event. 4135 @param avrcp_event: the AVRCP event to test. 4136 4137 @returns: true if the recorded output matches the expected output 4138 false otherwise 4139 """ 4140 logging.debug('AVRCP Event Test, Event: %s', avrcp_event) 4141 linux_input_button = {'play': KEY_PLAYCD, 'pause': KEY_PAUSECD, 4142 'stop': KEY_STOPCD, 'next': KEY_NEXTSONG, 4143 'previous': KEY_PREVIOUSSONG} 4144 expected_event = [ 4145 # Button down 4146 Event(EV_KEY, linux_input_button[avrcp_event], 1), 4147 recorder.SYN_EVENT, 4148 # Button up 4149 Event(EV_KEY, linux_input_button[avrcp_event], 0), 4150 recorder.SYN_EVENT] 4151 4152 gesture = lambda: generator(avrcp_event) 4153 actual_event = self._record_input_events(device, gesture) 4154 4155 return actual_event == expected_event 4156 4157 4158 # ------------------------------------------------------------------- 4159 # Enterprise policy tests 4160 # ------------------------------------------------------------------- 4161 4162 def _test_check_set_allowlist(self, uuids, expected_result): 4163 """The test to set valid and invalid allowlists test. 4164 4165 @param uuids: the uuids in the allowlist to set. 4166 @param expected_result: True if the test is expected to pass. 4167 """ 4168 create_uuid = bluetooth_test_utils.Bluetooth_UUID.create_valid_uuid 4169 exp_res_str = 'valid' if expected_result else 'invalid' 4170 logging.info('%s uuids: "%s"', exp_res_str, uuids) 4171 4172 result, err_msg = self.bluetooth_facade.policy_set_service_allow_list( 4173 uuids) 4174 logging.debug('result %s (%s)', result, err_msg) 4175 4176 if expected_result: 4177 check_set_allowlist = result 4178 else: 4179 check_set_allowlist = ('org.bluez.Error.InvalidArguments' in err_msg 4180 and not result) 4181 4182 # Query bluez to read the allow list. 4183 actual_uuids_list = [ 4184 create_uuid(uuid) for uuid in 4185 self.bluetooth_facade.policy_get_service_allow_list()] 4186 actual_uuids_list.sort() 4187 4188 # Convert the original UUIDs into a list of full-length UUIDs and 4189 # remove duplicate UUIDs in order to compare the original UUIDs 4190 # with the actual UUIDs set by bluez. 4191 orig_uuids_list = [] 4192 if expected_result and uuids != '': 4193 for uuid in uuids.split(','): 4194 u = create_uuid(uuid) 4195 if u is None: 4196 raise error.TestFail('uuid %s in uuids %s is not valid' % 4197 (uuid, uuids)) 4198 orig_uuids_list.append(u) 4199 orig_dedup_uuids = list(set(orig_uuids_list)) 4200 orig_dedup_uuids.sort() 4201 uuids_comp_result = actual_uuids_list == orig_dedup_uuids 4202 4203 self.results = {'uuids': uuids, 4204 'expected_set_allowlist_result': expected_result, 4205 'actual_set_allowlist_result': result, 4206 'orig_dedup_uuids': orig_dedup_uuids, 4207 'actual_uuids_list': actual_uuids_list, 4208 'check_set_allowlist': check_set_allowlist, 4209 'uuids_comp_result': uuids_comp_result} 4210 logging.debug('actual_uuids_list %s', actual_uuids_list) 4211 logging.debug('orig_uuids_list %s', orig_uuids_list) 4212 4213 return (check_set_allowlist and uuids_comp_result) 4214 4215 4216 @test_retry_and_log(False) 4217 def test_check_set_allowlist(self, uuids, expected_result): 4218 """The test to set valid and invalid allowlists test. 4219 4220 @param uuids: the uuids in the allowlist to set. 4221 @param expected_result: True if the test is expected to pass. 4222 """ 4223 return self._test_check_set_allowlist(uuids, expected_result) 4224 4225 4226 @test_retry_and_log(False) 4227 def test_reset_allowlist(self): 4228 """The test to reset the allowlists. 4229 4230 The test is used to clean up the allowlist. 4231 """ 4232 return self._test_check_set_allowlist('', True) 4233 4234 4235 def policy_is_affected(self, device): 4236 """Check if the device is affected by policy. 4237 4238 @param device: the connected device. 4239 4240 @returns: True if the device is affected by the enterprise policy. 4241 False if not. None if the device is not found. 4242 """ 4243 return self.bluetooth_facade.policy_get_device_affected(device.address) 4244 4245 4246 @test_retry_and_log(False) 4247 def test_affected_by_policy(self, device): 4248 """A test that the device is affected by policy 4249 4250 @param device: the peripheral device 4251 @returns: True if the device is affected; False otherwise. 4252 """ 4253 result = self.policy_is_affected(device) 4254 logging.debug('policy_is_affected(%s): %s', device.address, result) 4255 self.results = { 4256 'expected_result': 'True (affected)', 4257 'actual_result': result 4258 } 4259 return result is True 4260 4261 4262 @test_retry_and_log(False) 4263 def test_not_affected_by_policy(self, device): 4264 """A test that the device is not affected by policy 4265 4266 @param device: the peripheral device 4267 @returns: True if the device is not affected; False otherwise. 4268 """ 4269 result = self.policy_is_affected(device) 4270 logging.debug('policy_is_affected(%s): %s', device.address, result) 4271 self.results = { 4272 'expected_result': 'False (not affected)', 4273 'actual_result': result 4274 } 4275 return result is False 4276 4277 def check_if_affected_by_policy(self, device, expected_result): 4278 """A test that the device policy is enforced correctly 4279 4280 @param device: the peripheral device 4281 @param expected_result: True if the test is expected to pass. 4282 4283 @returns: True if the device is affected or not affected per 4284 expected_result; False otherwise. 4285 """ 4286 if expected_result: 4287 return self.test_affected_by_policy(device) 4288 else: 4289 return self.test_not_affected_by_policy(device) 4290 4291 4292 # ------------------------------------------------------------------- 4293 # Servod related tests 4294 # ------------------------------------------------------------------- 4295 4296 @test_retry_and_log 4297 def test_power_consumption(self, device, max_power_mw): 4298 """Test the average power consumption.""" 4299 power_mw = device.servod.MeasurePowerConsumption() 4300 self.results = {'power_mw': power_mw} 4301 4302 if (power_mw is None): 4303 logging.error('Failed to measure power consumption') 4304 return False 4305 4306 power_mw = float(power_mw) 4307 logging.info('power consumption (mw): %f (max allowed: %f)', 4308 power_mw, max_power_mw) 4309 4310 return power_mw <= max_power_mw 4311 4312 4313 @test_retry_and_log 4314 def test_start_notify(self, object_path, cccd_value): 4315 """Test that a notification can be started on a characteristic 4316 4317 @param object_path: the object path of the characteristic. 4318 @param cccd_value: Possible CCCD values include 4319 0x00 - inferred from the remote characteristic's properties 4320 0x01 - notification 4321 0x02 - indication 4322 4323 @returns: The test results. 4324 4325 """ 4326 if object_path is None: 4327 logging.error('Invalid object path') 4328 return False 4329 4330 start_notify = self.bluetooth_facade.start_notify( 4331 object_path, cccd_value) 4332 is_notifying = self._wait_for_condition( 4333 lambda: self.bluetooth_facade.is_notifying( 4334 object_path), method_name()) 4335 4336 self.results = { 4337 'start_notify': start_notify, 4338 'is_notifying': is_notifying} 4339 4340 return all(self.results.values()) 4341 4342 4343 @test_retry_and_log 4344 def test_stop_notify(self, object_path): 4345 """Test that a notification can be stopped on a characteristic 4346 4347 @param object_path: the object path of the characteristic. 4348 4349 @returns: The test results. 4350 4351 """ 4352 if object_path is None: 4353 logging.error('Invalid object path') 4354 return False 4355 4356 stop_notify = self.bluetooth_facade.stop_notify(object_path) 4357 is_not_notifying = self._wait_for_condition( 4358 lambda: not self.bluetooth_facade.is_notifying( 4359 object_path), method_name()) 4360 4361 self.results = { 4362 'stop_notify': stop_notify, 4363 'is_not_notifying': is_not_notifying} 4364 4365 return all(self.results.values()) 4366 4367 4368 @test_retry_and_log(False) 4369 def test_set_discovery_filter(self, filter): 4370 """Test set discovery filter""" 4371 4372 return self.bluetooth_facade.set_discovery_filter(filter) 4373 4374 4375 @test_retry_and_log(False) 4376 def test_set_le_connection_parameters(self, address, parameters): 4377 """Test set LE connection parameters""" 4378 4379 return self.bluetooth_facade.set_le_connection_parameters( 4380 address, parameters) 4381 4382 4383 @test_retry_and_log(False) 4384 def test_get_connection_info(self, address): 4385 """Test that connection info to device is retrievable.""" 4386 4387 return (self.bluetooth_facade.get_connection_info(address) 4388 is not None) 4389 4390 4391 @test_retry_and_log(False, messages_stop=False) 4392 def test_suspend_and_wait_for_sleep(self, suspend, sleep_timeout): 4393 """ Suspend the device and wait until it is sleeping. 4394 4395 @param suspend: Sub-process that does the actual suspend call. 4396 @param sleep_timeout time limit in seconds to allow the host sleep. 4397 4398 @return True if host is asleep within a short timeout, False otherwise. 4399 """ 4400 suspend.start() 4401 try: 4402 self.host.test_wait_for_sleep(sleep_timeout) 4403 except Exception as e: 4404 suspend.join() 4405 self.results = {'exception': str(e)} 4406 return False 4407 4408 return True 4409 4410 4411 @test_retry_and_log(False, messages_start=False) 4412 def test_wait_for_resume(self, 4413 boot_id, 4414 suspend, 4415 resume_timeout, 4416 test_start_time, 4417 resume_slack=RESUME_DELTA, 4418 fail_on_timeout=False, 4419 fail_early_wake=True, 4420 collect_resume_time=False): 4421 """ Wait for device to resume from suspend. 4422 4423 @param boot_id: Current boot id 4424 @param suspend: Sub-process that does actual suspend call. 4425 @param resume_timeout: Expect device to resume in given timeout. 4426 @param test_start_time: When was this test started? (device time) 4427 @param resume_slack: Allow some slack on resume timeout. 4428 @param fail_on_timeout: Fails if timeout is reached 4429 @param fail_early_wake: Fails if timeout isn't reached 4430 @param collect_resume_time: Collect time to resume as perf keyval. 4431 4432 @return True if suspend sub-process completed without error. 4433 """ 4434 success = False 4435 results = {} 4436 4437 def _check_timeout(delta): 4438 if delta > timedelta(seconds=resume_timeout): 4439 return not fail_on_timeout 4440 else: 4441 return not fail_early_wake 4442 4443 def _check_suspend_attempt_or_raise(test_start, wake_at): 4444 """Make sure suspend attempt was recent or raise TestNA. 4445 4446 If we're looking at a previous suspend attempt, it means the test 4447 didn't trigger a suspend properly (i.e. no powerd call) 4448 4449 @param test_start: When we started the test. 4450 @param wake_at: When powerd suspend resumed. 4451 4452 @raises: error.TestNAError if found suspend occurred before we 4453 started the test. 4454 """ 4455 # If the last suspend attempt was before we started the test, it's 4456 # probably not a recent attempt. 4457 if wake_at < test_start: 4458 raise error.TestNAError( 4459 'No recent suspend attempt found. ' 4460 'Started test at {} but last suspend ended at {}'. 4461 format(test_start, wake_at)) 4462 4463 # If the last suspend attempt recorded time is some time in the 4464 # future, probably a time conversion error occurred. 4465 current_time = self.bluetooth_facade.get_device_utc_time() 4466 if current_time < wake_at: 4467 raise error.TestFail( 4468 'Timezone conversion error found. ' 4469 'Last suspend ended at {} but current time is {}'. 4470 format(wake_at, current_time)) 4471 4472 return True 4473 4474 def _check_retcode_or_raise(retcode): 4475 """Make sure powerd return was successful. 4476 4477 @param retcode: Return code of powerd_suspend. 4478 4479 @raises: error.TestNAError if failed suspend due to non-BT 4480 @return: False if BT woke us, True otherwise 4481 """ 4482 if retcode: 4483 if self.bluetooth_facade.bt_caused_last_resume(): 4484 return False 4485 else: 4486 raise error.TestNAError( 4487 'Failed suspend due to non-BT wake') 4488 4489 return True 4490 4491 # Sometimes it takes longer to resume from suspend; give some leeway 4492 resume_timeout = resume_timeout + resume_slack 4493 results['resume timeout'] = resume_timeout 4494 try: 4495 start = datetime.now() 4496 4497 # Wait for resume needs to wait longer in case device rebooted. 4498 # Otherwise, the test will fail with errno 111 (connection refused) 4499 self.host.test_wait_for_resume( 4500 boot_id, resume_timeout=self.RESUME_INTERNAL_TIMEOUT_SECS) 4501 4502 results['device accessible on resume'] = True 4503 4504 # As of now, a timeout in test_wait_for_resume doesn't raise. Start 4505 # by first measuring the delta until network is back up to the dut. 4506 network_delta = datetime.now() - start 4507 4508 # Use powerd logs to see how much time we actually spent in suspend 4509 # If the network went down during suspend, we will have spent less 4510 # time in suspend than expected. If we can't find info via powerd, 4511 # we can use measured time instead. 4512 info = self.bluetooth_facade.find_last_suspend_via_powerd_logs() 4513 if info: 4514 start_suspend_at, end_suspend_at, retcode = info 4515 logging.debug('find_last_suspend_via_powerd_logs returned: ' 4516 'start_suspend_at: {}, end_suspend_at: {}, ' 4517 'retcode {}'.format(start_suspend_at, 4518 end_suspend_at, retcode)) 4519 actual_delta = end_suspend_at - start_suspend_at 4520 results['powerd time to resume'] = actual_delta.total_seconds() 4521 results['powerd retcode'] = retcode 4522 4523 # Resume is successful if suspend occurred correctly and woke up 4524 # within the timeout. One significant caveat is that we only 4525 # fail here if BT blocked suspend, not if we woke spuriously. 4526 # This is by design (we depend on the timeout to check for 4527 # spurious wakeup). 4528 try: 4529 suspend_ok, retcode_ok, timeout_ok = False, False, False 4530 suspend_ok = _check_suspend_attempt_or_raise( 4531 test_start_time, end_suspend_at) 4532 retcode_ok = _check_retcode_or_raise(retcode) 4533 timeout_ok = _check_timeout(actual_delta) 4534 except error.TestNAError as e: 4535 raise e 4536 finally: 4537 logging.debug('_check_suspend_attempt_or_raise: {} ' 4538 '_check_retcode_or_raise: {} ' 4539 '_check_timeout: {}'.format( 4540 suspend_ok, retcode_ok, timeout_ok)) 4541 success = suspend_ok and retcode_ok and timeout_ok 4542 else: 4543 results['time to resume'] = network_delta.total_seconds() 4544 logging.debug( 4545 'Unable to get time to resume from powerd. Estimate sleep time ' 4546 'using network ping') 4547 success = _check_timeout(network_delta) 4548 except error.TestFail as e: 4549 results['device accessible on resume'] = False 4550 success = False 4551 logging.error('wait_for_resume: %s', e) 4552 4553 # If the resume failed due to a reboot, raise the testFail and exit 4554 # early from the test 4555 if 'client rebooted' in str(e): 4556 raise 4557 finally: 4558 suspend.join() 4559 4560 # Log wake performance 4561 if collect_resume_time: 4562 test_desc = '{}_wake_time'.format(self.test_name.replace(' ', '_')) 4563 wake_time = results.get('powerd time to resume', 4564 results.get('time to resume', 0)) 4565 # Only write perf if wake time exists (non-zero) 4566 if wake_time: 4567 self.write_perf_keyval({test_desc: wake_time}) 4568 4569 results['success'] = success 4570 results['suspend exit code'] = suspend.exitcode 4571 self.results = results 4572 4573 logging.info('test_wait_for_resume(): %r', results) 4574 return all([success, suspend.exitcode == 0]) 4575 4576 4577 def suspend_async(self, suspend_time, expect_bt_wake=False): 4578 """ Suspend asynchronously and return process for joining 4579 4580 @param suspend_time: how long to stay in suspend 4581 @param expect_bt_wake: Whether we expect bluetooth to wake us from 4582 suspend. If true, we expect this resume will occur early 4583 4584 @returns multiprocessing.Process object with suspend task 4585 """ 4586 4587 def _action_suspend(): 4588 try: 4589 self.bluetooth_facade.do_suspend(suspend_time, expect_bt_wake) 4590 except socket.error as e: 4591 # Socket errors may occur after suspend if the underlying 4592 # connection is lost during suspend (happens if usb-ethernet 4593 # disconnects and reconnects on resume). Catch all these errors 4594 # and swallow them. 4595 logging.warning( 4596 'Socket error on suspend. Swallowing error: %s', 4597 str(e)) 4598 return 0 4599 4600 proc = multiprocessing.Process(target=_action_suspend) 4601 proc.daemon = True 4602 return proc 4603 4604 4605 def device_connect_async(self, 4606 device_type, 4607 device, 4608 adapter_address, 4609 delay_wake=1, 4610 should_wake=True): 4611 """ Connects peer device asynchronously with DUT. 4612 4613 This function uses a thread instead of a subprocess so that the test 4614 result is stored for the test. Otherwise, the test connection was 4615 sometimes failing but the test itself was passing. 4616 4617 @param device_type: The device type (used to check if it's LE) 4618 @param device: the meta device with the peer device 4619 @param adapter_address: the address of the adapter 4620 @param delay_wake: delay wakeup by this many seconds 4621 @param should_wake: Should this cause a wakeup? 4622 4623 @returns threading.Thread object with device connect task 4624 """ 4625 4626 def _action_device_connect(): 4627 time.sleep(delay_wake) 4628 if 'BLE' in device_type: 4629 # LE reconnects by advertising (dut controller will create LE 4630 # connection, not the peer device) 4631 self.test_device_set_discoverable(device, True) 4632 else: 4633 # Classic requires peer to initiate a connection to wake up the 4634 # dut 4635 connect_func = self.test_connection_by_device_only 4636 if should_wake: 4637 connect_func(device, adapter_address) 4638 else: 4639 # If we're not expecting wake, this connect attempt will 4640 # probably fail. 4641 self.ignore_failure(connect_func, device, adapter_address) 4642 4643 thread = threading.Thread(target=_action_device_connect) 4644 return thread 4645 4646 4647 @test_retry_and_log(False) 4648 def test_hid_device_created(self, device_address): 4649 """ Tests that the hid device is created before using it for tests. 4650 4651 @param device_address: Address of peripheral device 4652 """ 4653 device_found = self.bluetooth_facade.wait_for_hid_device( 4654 device_address) 4655 self.results = { 4656 'device_found': device_found 4657 } 4658 return all(self.results.values()) 4659 4660 4661 @test_retry_and_log(False) 4662 def test_hid_device_created_speed(self, device): 4663 """ Tests that the hid device is created with faster polling. 4664 4665 @param device: Peripheral device 4666 """ 4667 device_found = self.bluetooth_facade.wait_for_hid_device( 4668 device_address=device.address, sleep_interval=0.1) 4669 self.results = {'device_found': device_found} 4670 return all(self.results.values()) 4671 4672 4673 @test_retry_and_log(False) 4674 def test_hid_device_reconnect_time(self, duration, device_type): 4675 """ Tests that the hid device reconnection is fast enough. 4676 4677 @param duration: The averaged duration of HID reconnection 4678 @param device_type: Specified the type of the device 4679 """ 4680 4681 if 'BLE' in device_type: 4682 max_duration = LE_HID_RECONNECT_TIME_MAX_SEC 4683 else: 4684 max_duration = HID_RECONNECT_TIME_MAX_SEC 4685 4686 self.results = { 4687 'hid_reconnect_time': duration, 4688 'max_passing_time': max_duration 4689 } 4690 return duration < max_duration 4691 4692 4693 @test_retry_and_log 4694 def test_battery_reporting(self, device): 4695 """ Tests that battery reporting through GATT can be received 4696 4697 @param device: the meta device containing a Bluetooth device 4698 4699 @returns: true if battery reporting is received 4700 """ 4701 4702 def _get_battery_percentage(): 4703 return self.bluetooth_facade.get_battery_property( 4704 device.address, 'Percentage') 4705 4706 # Sometimes the battery interface isn't available on the device 4707 # right away. Wait for it to become available. 4708 utils.poll_for_condition( 4709 condition=lambda: _get_battery_percentage() is not None, 4710 timeout=self.ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS, 4711 sleep_interval=self.ADAPTER_POLLING_DEFAULT_SLEEP_SECS, 4712 desc='Waiting for battery on %s' % device.address) 4713 4714 return _get_battery_percentage() > 0 4715 4716 def _apply_new_adapter_alias(self, alias): 4717 """ Sets new system alias and applies discoverable setting 4718 4719 @param alias: string alias to be applied to Adapter->Alias property 4720 """ 4721 4722 # Set Adapter's Alias property 4723 self.bluetooth_facade.set_adapter_alias(alias) 4724 4725 # Set discoverable setting on 4726 self.bluetooth_facade.set_discoverable(True) 4727 4728 @test_retry_and_log(False) 4729 def test_set_adapter_alias(self, alias): 4730 """ Validates that a new adapter alias is applied correctly 4731 4732 @param alias: string alias to be applied to Adapter->Alias property 4733 4734 @returns: True if the applied alias is properly applied in btmon trace 4735 """ 4736 4737 orig_alias = self.get_adapter_properties()['Alias'] 4738 self.bluetooth_le_facade = self.bluetooth_facade 4739 4740 # 1. Capture btmon logs around alias set operation 4741 self._get_btmon_log(lambda: self._apply_new_adapter_alias(alias)) 4742 4743 # 2. Verify that name appears in btmon trace with the following format: 4744 # "Name (complete): Chromebook_BA0E" as appears in EIR data set 4745 expected_alias_str = 'Name (complete): ' + alias 4746 alias_found = self.bluetooth_facade.btmon_find(expected_alias_str) 4747 4748 # 3. Re-apply previous bluez alias as other tests expect default 4749 self.bluetooth_facade.set_adapter_alias(orig_alias) 4750 4751 self.results = {'alias_found': alias_found} 4752 return all(self.results.values()) 4753 4754 # ------------------------------------------------------------------- 4755 # Autotest methods 4756 # ------------------------------------------------------------------- 4757 4758 4759 def initialize(self): 4760 """Initialize bluetooth adapter tests.""" 4761 # Run through every tests and collect failed tests in self.fails. 4762 self.fails = [] 4763 4764 # If a test depends on multiple conditions, write the results of 4765 # the conditions in self.results so that it is easy to know 4766 # what conditions failed by looking at the log. 4767 self.results = None 4768 4769 # If any known failures were seen in the logs at any time during this 4770 # test execution, we capture that here. This includes daemon crashes, 4771 # usb disconnects or any of the other known common failure reasons 4772 self.had_known_common_failure = False 4773 4774 # Some tests may instantiate a peripheral device for testing. 4775 self.devices = dict() 4776 for device_type in SUPPORTED_DEVICE_TYPES: 4777 self.devices[device_type] = list() 4778 4779 # The count of registered advertisements. 4780 self.count_advertisements = 0 4781 4782 4783 def get_device_sample_rssi(self, device, use_cached_value=True): 4784 """ Get one RSSI value of the given device. 4785 4786 @param device: the peer device to be examined RSSI 4787 @param use_cached_value: Use the cached value 4788 4789 @returns: rssi value if the device is found, 4790 None otherwise 4791 """ 4792 4793 # Maximum retry attempts of RSSI query 4794 MAX_RETRY = 3 4795 # Time between each RSSI query 4796 WAIT_TIME = 2 4797 rssi = None 4798 4799 # device could have tested RSSI if we enable check_rssi, if so, reuse it 4800 # 4801 # Note: 4802 # device is special in that hasattr(device, xxx) will evaluate to 4803 # the _Method class if xxx does not physically exist. Hence, 4804 # isinstance(device.rssi, int) instead of hasattr(device, 'rssi') 4805 # is used as the condition below. 4806 # Refer to class _Method in client/cros/chameleon/chameleon.py 4807 if isinstance(device.rssi, int) and use_cached_value: 4808 return device.rssi 4809 4810 try: 4811 self.test_start_discovery() 4812 4813 # The RSSI property is only maintained while discovery is 4814 # enabled. Stopping discovery removes the property. Thus, look 4815 # up the RSSI without modifying discovery state. 4816 found = self.test_discover_device(device.address, 4817 start_discovery=False, 4818 stop_discovery=False) 4819 4820 if not found: 4821 logging.info('Device %s not found', device.address) 4822 return None 4823 4824 for i in range(MAX_RETRY): 4825 rssi = self.bluetooth_facade.get_device_property( 4826 device.address, 'RSSI') 4827 if rssi: 4828 break 4829 time.sleep(WAIT_TIME) 4830 4831 if not rssi: 4832 logging.info('RSSI of device %s not found', device.address) 4833 return None 4834 4835 device.rssi = rssi 4836 logging.info('Peer {} RSSI {}'.format(device.address, rssi)) 4837 4838 finally: 4839 self.test_stop_discovery() 4840 logging.info('Clearing device for test: {}'.format(device.address)) 4841 self.bluetooth_facade.remove_device_object(device.address) 4842 4843 return rssi 4844 4845 def check_floss_support(self): 4846 """ Check whether this device supports Floss 4847 4848 Check for the presence of /usr/bin/btmanagerd and fail with TESTNA 4849 if the file is not present. This should only fail on the following boards 4850 with 2GB rootfs where Floss is not enabled 4851 ['asuka', 'banon', 'bob', 'caroline', 'cave', 'celes', 4852 'chell', 'coral', 'cyan', 'edgar', 'elm', 'gru', 'hana', 4853 'kefka', 'kevin', 'lars', 'pyro', 'reef', 'reks', 'relm', 4854 'sand', 'scarlet', 'sentry', 'setzer', 'snappy', 'terra', 'ultima'] 4855 4856 4857 @raises error.TestNA if device doesn't support Floss 4858 """ 4859 if not self.bluetooth_facade.is_btmanagerd_present(): 4860 raise error.TestNAError('Floss cannot be enabled on this device') 4861 4862 def verify_device_rssi(self, device_list): 4863 """ Test device rssi is over required threshold. 4864 4865 @param device_list: List of peer devices to verify rssi 4866 4867 @raises error.TestNA if any device isn't found or RSSI is too low 4868 """ 4869 try: 4870 self.test_start_discovery() 4871 for device in device_list: 4872 # The RSSI property is only maintained while discovery is 4873 # enabled. Stopping discovery removes the property. Thus, look 4874 # up the RSSI without modifying discovery state. 4875 found = self.test_discover_device(device.address, 4876 start_discovery=False, 4877 stop_discovery=False) 4878 rssi = self.bluetooth_facade.get_device_property( 4879 device.address, 'RSSI') 4880 4881 if not found: 4882 # Not clearing self.fails will result in test 4883 # failing with test_discover_device failure 4884 self.fails = [] 4885 logging.info( 4886 'Failing with TEST_NA as peer %s was not' 4887 ' discovered during RSSI check', device.address) 4888 raise error.TestNAError( 4889 'Peer {} not discovered during RSSI check'.format( 4890 device.address)) 4891 4892 if not rssi or rssi < self.MIN_RSSI: 4893 logging.info('Failing with TEST_NA since RSSI (%s) is low ', 4894 rssi) 4895 raise error.TestNAError( 4896 'Peer {} RSSI is too low: {}'.format( 4897 device.address, rssi)) 4898 device.rssi = rssi 4899 4900 logging.info('Peer {} RSSI {}'.format(device.address, rssi)) 4901 finally: 4902 self.test_stop_discovery() 4903 4904 for device in device_list: 4905 logging.info('Clearing device for test: {}'.format( 4906 device.address)) 4907 self.bluetooth_facade.remove_device_object(device.address) 4908 4909 def verify_controller_capability(self, required_roles=[], 4910 test_type=''): 4911 """Raise an exception if required role support isn't present 4912 4913 @param required_roles: List of test role requirements in 4914 ["central", "peripheral", "central-peripheral"] 4915 4916 @raises: error.TestFail if device does not meet requirements 4917 AND test_type is 'AVL' 4918 error.TestNA if device does not meet requirements 4919 and test_type is not 'AVL' 4920 """ 4921 4922 adapter_props = self.get_adapter_properties() 4923 4924 supported_roles = adapter_props.get('Roles', []) 4925 4926 for req in required_roles: 4927 if req not in supported_roles: 4928 # We don't meet requirements, throw error 4929 msg = 'Role requirement {} not in supported modes {}'.format( 4930 req, supported_roles) 4931 4932 if test_type == 'AVL': 4933 raise error.TestFail(msg) 4934 4935 logging.info('Failing with TEST_NA due to %s', msg) 4936 raise error.TestNAError(msg) 4937 4938 4939 def set_fail_fast(self, args_dict, default=False): 4940 """Set whether the test should fail fast if running into any problem 4941 4942 By default it should not fail fast so that a batch test can continue 4943 running the rest after a failure in one test 4944 4945 :param args_dict: the arguments passed int from the command line 4946 :param default: the default value when the flag is missing from the 4947 args_dict 4948 4949 """ 4950 flag_name = 'fail_fast' 4951 if args_dict and flag_name in args_dict: 4952 self.fail_fast = bool(args_dict[flag_name].lower() == 'true') 4953 else: 4954 self.fail_fast = default 4955 4956 4957 def assert_discover_and_pair(self, device): 4958 """ Discovers and pairs given device. Automatically connects too. 4959 4960 If any of the test expressions fail, it will raise an error so only call 4961 this function as a setup for a test. 4962 """ 4963 self.assert_on_fail(self.test_device_set_discoverable(device, True)) 4964 self.assert_on_fail(self.test_discover_device(device.address)) 4965 self.assert_on_fail( 4966 self.test_pairing(device.address, device.pin, trusted=True)) 4967 4968 def identify_platform_failure_reasons(self): 4969 """ Identifies platform failure reasons to watch for in logs """ 4970 s = self.bluetooth_facade.get_bt_usb_disconnect_str() 4971 if s: 4972 COMMON_FAILURES[s] = 'USB disconnect detected' 4973 4974 def clean_bluetooth_kernel_log(self, level_name): 4975 """Remove Bluetooth kernel logs in /var/log/messages with equal or lower 4976 prioity than level_name 4977 4978 @param level_name: name of the log level, e.x. 'INFO', 'DEBUG'... 4979 """ 4980 self.bluetooth_facade.clean_bluetooth_kernel_log( 4981 KERNEL_LOG_LEVEL[level_name]) 4982 4983 def run_once(self, *args, **kwargs): 4984 """This method should be implemented by children classes. 4985 4986 Typically, the run_once() method would look like: 4987 4988 factory = remote_facade_factory.RemoteFacadeFactory(host) 4989 self.bluetooth_facade = factory.create_bluetooth_facade(self.floss) 4990 4991 self.test_bluetoothd_running() 4992 # ... 4993 # invoke more self.test_xxx() tests. 4994 # ... 4995 4996 if self.fails: 4997 raise error.TestFail(self.fails) 4998 4999 """ 5000 raise NotImplementedError 5001 5002 5003 def cleanup_bt_test(self, test_state='END'): 5004 """Clean up bluetooth adapter tests. 5005 5006 @param test_state: string describing the requested clear is for 5007 a new test(NEW), the middle of the test(MID), 5008 or the end of the test(END). 5009 """ 5010 5011 if test_state == 'END': 5012 # Disable all the bluetooth debug logs 5013 self.enable_disable_debug_log(enable=False) 5014 5015 # Re-enable cellular services 5016 self.enable_disable_cellular(enable=True) 5017 5018 # Re-enable ui 5019 self.enable_disable_ui(enable=True) 5020 5021 if hasattr(self, 'host'): 5022 # Stop btmon process 5023 self.host.run('pkill btmon || true') 5024 5025 #Stop tcpdump usbmon process 5026 self.host.run('pkill tcpdump || true') 5027 5028 5029 # Close the device properly if a device is instantiated. 5030 # Note: do not write something like the following statements 5031 # if self.devices[device_type]: 5032 # or 5033 # if bool(self.devices[device_type]): 5034 # Otherwise, it would try to invoke bluetooth_mouse.__nonzero__() 5035 # which just does not exist. 5036 for device_name, device_list in self.devices.items(): 5037 for device in device_list: 5038 if device is not None: 5039 device.Close() 5040 5041 # Power cycle BT device if we're in the middle of a test 5042 if test_state == 'MID': 5043 device.PowerCycle() 5044 5045 self.devices = dict() 5046 for device_type in SUPPORTED_DEVICE_TYPES: 5047 self.devices[device_type] = list() 5048 5049 # Called only by test.test 5050 def cleanup(self): 5051 """Cleanup test.test instance""" 5052 5053 self.cleanup_bt_test() 5054