1# Lint as: python2, python3 2# Copyright 2016 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Server side bluetooth adapter subtests.""" 7 8from __future__ import absolute_import 9from __future__ import division 10from __future__ import print_function 11 12from datetime import datetime, timedelta 13import errno 14import functools 15import six.moves.http_client 16import inspect 17import logging 18import multiprocessing 19import os 20import re 21import socket 22import threading 23import time 24 25import common 26from autotest_lib.client.bin import utils 27from autotest_lib.client.bin.input import input_event_recorder as recorder 28from autotest_lib.client.common_lib import error 29from autotest_lib.client.common_lib.cros.bluetooth import bluetooth_socket 30from autotest_lib.client.cros.chameleon import chameleon 31from autotest_lib.server.cros.bluetooth import bluetooth_peer_update 32from autotest_lib.server.cros.bluetooth import bluetooth_test_utils 33from autotest_lib.server import test 34 35from autotest_lib.client.bin.input.linux_input import ( 36 BTN_LEFT, BTN_RIGHT, EV_KEY, EV_REL, REL_X, REL_Y, REL_WHEEL, 37 REL_WHEEL_HI_RES, KEY_PLAYCD, KEY_PAUSECD, KEY_STOPCD, KEY_NEXTSONG, 38 KEY_PREVIOUSSONG) 39from autotest_lib.server.cros.bluetooth.bluetooth_gatt_client_utils import ( 40 GATT_ClientFacade, GATT_Application, GATT_HIDApplication) 41from autotest_lib.server.cros.multimedia import remote_facade_factory 42import six 43from six.moves import map 44from six.moves import range 45from six.moves import zip 46 47 48Event = recorder.Event 49 50CHIPSET_TO_VIDPID = { 'BRCM-4354':[('0x002d','0x4354')], 51 'MVL-8897':[('0x02df','0x912d')], 52 'MVL-8997':[('0x1b4b','0x2b42')], 53 'QCA-9462': [('0x168c', '0x0034')], 54 'QCA-6174A-5':[('0x168c','0x003e')], 55 'QCA-6174A-3':[('0x271','0x050a')], # UART 56 'Intel-AX200':[('0x8086', '0x2723')], # CcP2 57 'Intel-AX201':[('0x8086','0x02f0')], # HrP2 58 'Intel-AC9260':[('0x8086','0x2526')], # ThP2 59 'Intel-AC9560':[('0x8086','0x31dc'), # JfP2 60 ('0x8086','0x9df0')], 61 'Intel-AC7260':[('0x8086','0x08b1'), # WP2 62 ('0x8086','0x08b2')], 63 'Intel-AC7265':[('0x8086','0x095a'), # StP2 64 ('0x8086','0x095b')], 65 'Realtek-RTL8822C-USB':[('0x10ec','0xc822')] } 66 67# We have a number of chipsets that are no longer supported. Known issues 68# related to firmware will be ignored on these devices (b/169328792). 69UNSUPPORTED_CHIPSETS = [ 70 'BRCM-4354', 'MVL-8897', 'MVL-8997', 'Intel-AC7260', 'Intel-AC7265' 71] 72 73# Location of data traces relative to this (bluetooth_adapter_tests.py) file 74BT_ADAPTER_TEST_PATH = os.path.dirname(__file__) 75TRACE_LOCATION = os.path.join(BT_ADAPTER_TEST_PATH, 'input_traces/keyboard') 76 77RESUME_DELTA = -5 78 79# Delay binding the methods since host is only available at run time. 80SUPPORTED_DEVICE_TYPES = { 81 'MOUSE': lambda btpeer: btpeer.get_bluetooth_hid_mouse, 82 'KEYBOARD': lambda btpeer: btpeer.get_bluetooth_hid_keyboard, 83 'BLE_MOUSE': lambda btpeer: btpeer.get_ble_mouse, 84 'BLE_KEYBOARD': lambda btpeer: btpeer.get_ble_keyboard, 85 # Tester allows us to test DUT's discoverability, etc. from a peer 86 'BLUETOOTH_TESTER': lambda btpeer: btpeer.get_bluetooth_tester, 87 # This is a base object that does not emulate any Bluetooth device. 88 # This object is preferred when only a pure XMLRPC server is needed 89 # on the btpeer host, e.g., to perform servod methods. 90 'BLUETOOTH_BASE': lambda btpeer: btpeer.get_bluetooth_base, 91 # on the chameleon host, e.g., to perform servod methods. 92 'BLUETOOTH_BASE': lambda chameleon: chameleon.get_bluetooth_base, 93 # A phone device that supports Bluetooth 94 'BLE_PHONE': lambda chameleon: chameleon.get_ble_phone, 95 # A Bluetooth audio device emulating a headphone 96 'BLUETOOTH_AUDIO': lambda chameleon: chameleon.get_bluetooth_audio, 97} 98 99COMMON_FAILURES = { 100 'Freeing adapter /org/bluez/hci': 'adapter_freed', 101 '/var/spool/crash/bluetoothd': 'bluetoothd_crashed', 102} 103 104# TODO(b/150898182) - Don't run some tests on tablet form factors 105# This list was generated by looking for tablet models on Goldeneye and removing 106# the ones that were not launched 107TABLET_MODELS = ['kakadu', 'kodama', 'krane', 'dru', 'druwl', 'dumo'] 108 109# TODO(b/161005264) - Some tests rely on software rotation to pass, so we must 110# know which models don't use software rotation. Use a static list until we can 111# query the bluez API instead. Extended advertising is supported on platforms 112# on 4.19 and 5.4, with HrP2, JfP2, CcP2, RTL8822C, or QCN3991 chipsets. 113EXT_ADV_MODELS = ['ezkinil', 'trembyle', 'drawcia', 'drawlat', 'drawman', 114 'maglia', 'magolor', 'sarien', 'arcada', 'akemi', 115 'drallion', 'drallion360', 'hatch', 'stryke', 'helios', 116 'dragonair', 'dratini', 'duffy', 'jinlon', 'kaisa', 117 'kindred', 'kled', 'puff', 'kohaku', 'nightfury', 'morphius', 118 'lazor', 'trogdor'] 119 120# TODO(b/158336394) Realtek: Powers down during suspend due to high power usage 121# during S3. 122# TODO(b/168152910) Marvell: Powers down during suspend due to flakiness when 123# entering suspend. This will also skip the tests 124# for Veyron (which don't power down right now) but 125# reconnect tests are still enabled for that platform 126# to check for suspend stability. 127SUSPEND_POWER_DOWN_CHIPSETS = ['Realtek-RTL8822C-USB', 'MVL-8897', 'MVL-8997'] 128 129 130def method_name(): 131 """Get the method name of a class. 132 133 This function is supposed to be invoked inside a class and will 134 return current method name who invokes this function. 135 136 @returns: the string of the method name inside the class. 137 """ 138 return inspect.getouterframes(inspect.currentframe())[1][3] 139 140 141def _run_method(method, method_name, *args, **kwargs): 142 """Run a target method and capture exceptions if any. 143 144 This is just a wrapper of the target method so that we do not need to 145 write the exception capturing structure repeatedly. The method could 146 be either a device method or a facade method. 147 148 @param method: the method to run 149 @param method_name: the name of the method 150 151 @returns: the return value of target method() if successful. 152 False otherwise. 153 154 """ 155 result = False 156 try: 157 result = method(*args, **kwargs) 158 except Exception as e: 159 logging.error('%s: %s', method_name, e) 160 except: 161 logging.error('%s: unexpected error', method_name) 162 return result 163 164 165def get_bluetooth_emulated_device(btpeer, device_type): 166 """Get the bluetooth emulated device object. 167 168 @param btpeer: the Bluetooth peer device 169 @param device_type : the bluetooth device type, e.g., 'MOUSE' 170 171 @returns: the bluetooth device object 172 173 """ 174 175 def _retry_device_method(method_name, legal_falsy_values=[]): 176 """retry the emulated device's method. 177 178 The method is invoked as device.xxxx() e.g., device.GetAdvertisedName(). 179 180 Note that the method name string is provided to get the device's actual 181 method object at run time through getattr(). The rebinding is required 182 because a new device may have been created previously or during the 183 execution of fix_serial_device(). 184 185 Given a device's method, it is not feasible to get the method name 186 through __name__ attribute. This limitation is due to the fact that 187 the device is a dotted object of an XML RPC server proxy. 188 As an example, with the method name 'GetAdvertisedName', we could 189 derive the correspoinding method device.GetAdvertisedName. On the 190 contrary, given device.GetAdvertisedName, it is not feasible to get the 191 method name by device.GetAdvertisedName.__name__ 192 193 Also note that if the device method fails, we would try remediation 194 step and retry the device method. The remediation steps are 195 1) re-creating the serial device. 196 2) reset (powercycle) the bluetooth dongle. 197 3) reboot Bluetooth peer. 198 If the device method still fails after these steps, we fail the test 199 200 The default values exist for uses of this function before the options 201 were added, ideally we should change zero_ok to False. 202 203 @param method_name: the string of the method name. 204 @param legal_falsy_values: Values that are falsy but might be OK. 205 206 @returns: the result returned by the device's method if the call was 207 successful 208 209 @raises: TestError if the devices's method fails or if repair of 210 peripheral kit fails 211 212 """ 213 214 action_table = [('recreate' , 'Fixing the serial device'), 215 ('reset', 'Power cycle the peer device'), 216 ('reboot', 'Reboot the chamleond host')] 217 218 for i, (action, description) in enumerate(action_table): 219 logging.info('Attempt %s : %s ', i+1, method_name) 220 221 result = _run_method(getattr(device, method_name), method_name) 222 if _is_successful(result, legal_falsy_values): 223 return result 224 225 logging.error('%s failed the %s time. Attempting to %s', 226 method_name,i,description) 227 if not fix_serial_device(btpeer, device, action): 228 logging.info('%s failed', description) 229 else: 230 logging.info('%s successful', description) 231 232 #try it last time after fix it by last action 233 result = _run_method(getattr(device, method_name), method_name) 234 if _is_successful(result, legal_falsy_values): 235 return result 236 237 raise error.TestError('Failed to execute %s. Bluetooth peer device is' 238 'not working' % method_name) 239 240 241 if device_type not in SUPPORTED_DEVICE_TYPES: 242 raise error.TestError('The device type is not supported: %s', 243 device_type) 244 245 # Get the bluetooth device object and query some important properties. 246 device = SUPPORTED_DEVICE_TYPES[device_type](btpeer)() 247 248 # Get some properties of the kit 249 # NOTE: Strings updated here must be kept in sync with Btpeer. 250 device._capabilities = _retry_device_method('GetCapabilities') 251 device._transports = device._capabilities["CAP_TRANSPORTS"] 252 device._is_le_only = ("TRANSPORT_LE" in device._transports and 253 len(device._transports) == 1) 254 device._has_pin = device._capabilities["CAP_HAS_PIN"] 255 device.can_init_connection = device._capabilities["CAP_INIT_CONNECT"] 256 257 _retry_device_method('Init') 258 logging.info('device type: %s', device_type) 259 260 device.name = _retry_device_method('GetAdvertisedName') 261 logging.info('device name: %s', device.name) 262 263 device.address = _retry_device_method('GetLocalBluetoothAddress') 264 logging.info('address: %s', device.address) 265 266 pin_falsy_values = [] if device._has_pin else [None] 267 device.pin = _retry_device_method('GetPinCode', pin_falsy_values) 268 logging.info('pin: %s', device.pin) 269 270 class_falsy_values = [None] if device._is_le_only else [0] 271 272 # Class of service is None for LE-only devices. Don't fail or parse it. 273 device.class_of_service = _retry_device_method('GetClassOfService', 274 class_falsy_values) 275 if device._is_le_only: 276 parsed_class_of_service = device.class_of_service 277 else: 278 parsed_class_of_service = "0x%04X" % device.class_of_service 279 logging.info('class of service: %s', parsed_class_of_service) 280 281 device.class_of_device = _retry_device_method('GetClassOfDevice', 282 class_falsy_values) 283 # Class of device is None for LE-only devices. Don't fail or parse it. 284 if device._is_le_only: 285 parsed_class_of_device = device.class_of_device 286 else: 287 parsed_class_of_device = "0x%04X" % device.class_of_device 288 logging.info('class of device: %s', parsed_class_of_device) 289 290 device.device_type = _retry_device_method('GetDeviceType') 291 logging.info('device type: %s', device.device_type) 292 293 device.authentication_mode = None 294 if not device._is_le_only: 295 device.authentication_mode = _retry_device_method('GetAuthenticationMode') 296 logging.info('authentication mode: %s', device.authentication_mode) 297 298 device.port = _retry_device_method('GetPort') 299 logging.info('serial port: %s\n', device.port) 300 301 return device 302 303 304def recreate_serial_device(device): 305 """Create and connect to a new serial device. 306 307 @param device: the bluetooth HID device 308 309 @returns: True if the serial device is re-created successfully. 310 311 """ 312 logging.info('Remove the old serial device and create a new one.') 313 if device is not None: 314 try: 315 device.Close() 316 except: 317 logging.error('failed to close the serial device.') 318 return False 319 try: 320 device.CreateSerialDevice() 321 return True 322 except: 323 logging.error('failed to invoke CreateSerialDevice.') 324 return False 325 326 327def _check_device_init(device, operation): 328 # Check if the serial device could initialize, connect, and 329 # enter command mode correctly. 330 logging.info('Checking device status...') 331 if not _run_method(device.Init, 'Init'): 332 logging.info('device.Init: failed after %s', operation) 333 return False 334 if not device.CheckSerialConnection(): 335 logging.info('device.CheckSerialConnection: failed after %s', operation) 336 return False 337 if not _run_method(device.EnterCommandMode, 'EnterCommandMode'): 338 logging.info('device.EnterCommandMode: failed after %s', operation) 339 return False 340 logging.info('The device is created successfully after %s.', operation) 341 return True 342 343def _reboot_btpeer(btpeer, device): 344 """ Reboot Bluetooth peer device. 345 346 Also power cycle the device since reboot may not do that..""" 347 348 # Chameleond fizz hosts should have write protect removed and 349 # set_gbb_flags set to 0 to minimize boot time 350 REBOOT_SLEEP_SECS = 10 351 RESET_SLEEP_SECS = 1 352 353 # Close the bluetooth peripheral device and reboot the chameleon board. 354 device.Close() 355 logging.info("Powercycling the device") 356 device.PowerCycle() 357 time.sleep(RESET_SLEEP_SECS) 358 logging.info('rebooting Bluetooth peer...') 359 btpeer.reboot() 360 361 # Every btpeer reboot would take a bit more than REBOOT_SLEEP_SECS. 362 # Sleep REBOOT_SLEEP_SECS and then begin probing the btpeer board. 363 time.sleep(REBOOT_SLEEP_SECS) 364 return _check_device_init(device, 'reboot') 365 366def _reset_device_power(device): 367 """Power cycle the device.""" 368 RESET_SLEEP_SECS = 1 369 try: 370 if not device.PowerCycle(): 371 logging.info('device.PowerCycle() failed') 372 return False 373 except: 374 logging.error('exception in device.PowerCycle') 375 else: 376 logging.info('device powercycled') 377 time.sleep(RESET_SLEEP_SECS) 378 return _check_device_init(device, 'reset') 379 380def _is_successful(result, legal_falsy_values=[]): 381 """Is the method result considered successful? 382 383 Some method results, for example that of class_of_service, may be 0 which is 384 considered a valid result. Occassionally, None is acceptable. 385 386 The default values exist for uses of this function before the options were 387 added, ideally we should change zero_ok to False. 388 389 @param result: a method result 390 @param legal_falsy_values: Values that are falsy but might be OK. 391 392 @returns: True if bool(result) is True, or if result is 0 and zero_ok, or if 393 result is None and none_ok. 394 """ 395 truthiness_of_result = bool(result) 396 return truthiness_of_result or result in legal_falsy_values 397 398 399def _flag_common_failures(instance): 400 """Checks if a common failure has occurred during the test run 401 402 Scans system logs for known signs of failure. If a failure is discovered, 403 it is added to the test results, to make it easier to identify common root 404 causes from Stainless 405 """ 406 407 for fail_tag, fail_log in COMMON_FAILURES.items(): 408 if instance.bluetooth_facade.messages_find(fail_tag): 409 logging.error('Detected failure tag: %s', fail_tag) 410 # We mark this instance's results with the discovered failure 411 if type(instance.results) is dict: 412 instance.results[fail_log] = True 413 414 415def fix_serial_device(btpeer, device, operation='reset'): 416 """Fix the serial device. 417 418 This function tries to fix the serial device by 419 (1) re-creating a serial device, or 420 (2) power cycling the usb port to which device is connected 421 (3) rebooting the Bluetooth peeer 422 423 Argument operation determine which of the steps above are perform 424 425 Note that rebooting the btpeer board or resetting the device will remove 426 the state on the peripheral which might cause test failures. Please use 427 reset/reboot only before or after a test. 428 429 @param btpeer: the Bluetooth peer 430 @param device: the bluetooth device. 431 @param operation: Recovery operation to perform 'recreate/reset/reboot' 432 433 @returns: True if the serial device is fixed. False otherwise. 434 435 """ 436 437 if operation == 'recreate': 438 # Check the serial connection. Fix it if needed. 439 if device.CheckSerialConnection(): 440 # The USB serial connection still exists. 441 # Re-connection suffices to solve the problem. The problem 442 # is usually caused by serial port change. For example, 443 # the serial port changed from /dev/ttyUSB0 to /dev/ttyUSB1. 444 logging.info('retry: creating a new serial device...') 445 return recreate_serial_device(device) 446 else: 447 # Recreate the bluetooth peer device 448 return _check_device_init(device, operation) 449 450 elif operation == 'reset': 451 # Powercycle the USB port where the bluetooth peer device is connected. 452 # RN-42 and RN-52 share the same vid:pid so both will be powercycled. 453 # This will only work on fizz host with write protection removed. 454 # Note that the state on the device will be lost. 455 return _reset_device_power(device) 456 457 elif operation == 'reboot': 458 # Reboot the Bluetooth peer device. 459 # The device is power cycled before rebooting Bluetooth peer device 460 return _reboot_btpeer(btpeer, device) 461 462 else: 463 logging.error('fix_serial_device Invalid operation %s', operation) 464 return False 465 466 467def retry(test_method, instance, *args, **kwargs): 468 """Execute the target facade test_method(). Retry if failing the first time. 469 470 A test_method is something like self.test_xxxx() in BluetoothAdapterTests, 471 e.g., BluetoothAdapterTests.test_bluetoothd_running(). 472 473 @param test_method: the test method to retry 474 475 @returns: True if the return value of test_method() is successful. 476 False otherwise. 477 478 """ 479 if _is_successful(_run_method(test_method, test_method.__name__, 480 instance, *args, **kwargs)): 481 return True 482 483 # Try to fix the serial device if applicable. 484 logging.error('%s failed at the 1st time: (%s)', test_method.__name__, 485 str(instance.results)) 486 487 # If this test does not use any attached serial device, just re-run 488 # the test. 489 logging.info('%s: retry the 2nd time.', test_method.__name__) 490 time.sleep(1) 491 492 493 if not hasattr(instance, 'use_btpeer'): 494 return _is_successful(_run_method(test_method, test_method.__name__, 495 instance, *args, **kwargs)) 496 for device_type in SUPPORTED_DEVICE_TYPES: 497 for device in getattr(instance, 'devices')[device_type]: 498 #fix_serial_device in 'recreate' mode doesn't require btpeer 499 #so just pass None for convenient. 500 if not fix_serial_device(None, device, "recreate"): 501 return False 502 503 logging.info('%s: retry the 2nd time.', test_method.__name__) 504 return _is_successful(_run_method(test_method, test_method.__name__, 505 instance, *args, **kwargs)) 506 507 508def test_retry_and_log(test_method_or_retry_flag, 509 messages_start=True, 510 messages_stop=True): 511 """A decorator that logs test results, collects error messages, and retries 512 on request. 513 514 @param test_method_or_retry_flag: either the test_method or a retry_flag. 515 There are some possibilities of this argument: 516 1. the test_method to conduct and retry: should retry the test_method. 517 This occurs with 518 @test_retry_and_log 519 2. the retry flag is True. Should retry the test_method. 520 This occurs with 521 @test_retry_and_log(True) 522 3. the retry flag is False. Do not retry the test_method. 523 This occurs with 524 @test_retry_and_log(False) 525 526 @param messages_start: Start collecting messages before running the test 527 @param messages_stop: Stop collecting messages after running the test and 528 analyze the results. 529 530 @returns: a wrapper of the test_method with test log. The retry mechanism 531 would depend on the retry flag. 532 533 """ 534 535 def decorator(test_method): 536 """A decorator wrapper of the decorated test_method. 537 538 @param test_method: the test method being decorated. 539 540 @returns the wrapper of the test method. 541 542 """ 543 @functools.wraps(test_method) 544 def wrapper(instance, *args, **kwargs): 545 """A wrapper of the decorated method. 546 547 @param instance: an BluetoothAdapterTests instance 548 549 @returns the result of the test method 550 551 """ 552 instance.results = None 553 fail_msg = None 554 test_result = False 555 should_raise = hasattr(instance, 'fail_fast') and instance.fail_fast 556 557 instance.last_test_method = test_method.__name__ 558 syslog_captured = False 559 560 try: 561 if messages_start: 562 # Grab /var/log/messages output during test run 563 instance.bluetooth_facade.messages_start() 564 565 if callable(test_method_or_retry_flag 566 ) or test_method_or_retry_flag: 567 test_result = retry(test_method, instance, *args, **kwargs) 568 else: 569 test_result = test_method(instance, *args, **kwargs) 570 571 if messages_stop: 572 syslog_captured = instance.bluetooth_facade.messages_stop() 573 574 if test_result: 575 logging.info('[*** passed: {}]'.format( 576 test_method.__name__)) 577 else: 578 if syslog_captured: 579 _flag_common_failures(instance) 580 fail_msg = '[--- failed: {} ({})]'.format( 581 test_method.__name__, str(instance.results)) 582 logging.error(fail_msg) 583 instance.fails.append(fail_msg) 584 # Log TestError and TestNA and let the quicktest wrapper catch it. 585 # Those errors should skip out of the testcase entirely. 586 except error.TestNAError as e: 587 fail_msg = '[--- TESTNA {} ({})]'.format( 588 test_method.__name__, str(e)) 589 logging.error(fail_msg) 590 raise 591 except error.TestError as e: 592 fail_msg = '[--- ERROR {} ({})]'.format( 593 test_method.__name__, str(e)) 594 logging.error(fail_msg) 595 raise 596 except error.TestFail as e: 597 fail_msg = '[--- failed {} ({})]'.format( 598 test_method.__name__, str(e)) 599 logging.error(fail_msg) 600 instance.fails.append(fail_msg) 601 should_raise = True 602 603 # Check whether we should fail fast 604 if fail_msg and should_raise: 605 logging.info('Fail fast') 606 raise error.TestFail(instance.fails) 607 608 return test_result 609 610 return wrapper 611 612 if callable(test_method_or_retry_flag): 613 # If the decorator function comes with no argument like 614 # @test_retry_and_log 615 return decorator(test_method_or_retry_flag) 616 else: 617 # If the decorator function comes with an argument like 618 # @test_retry_and_log(False) 619 return decorator 620 621 622def test_case_log(method): 623 """A decorator for test case methods. 624 625 The main purpose of this decorator is to display the test case name 626 in the test log which looks like 627 628 <... test_case_RA3_CD_SI200_CD_PC_CD_UA3 ...> 629 630 @param method: the test case method to decorate. 631 632 @returns: a wrapper function of the decorated method. 633 634 """ 635 @functools.wraps(method) 636 def wrapper(instance, *args, **kwargs): 637 """Log the name of the wrapped method before execution""" 638 logging.info('\n<... %s ...>', method.__name__) 639 method(instance, *args, **kwargs) 640 return wrapper 641 642 643class BluetoothAdapterTests(test.test): 644 """Server side bluetooth adapter tests. 645 646 This test class tries to thoroughly verify most of the important work 647 states of a bluetooth adapter. 648 649 The various test methods are supposed to be invoked by actual autotest 650 tests such as server/cros/site_tests/bluetooth_Adapter*. 651 652 """ 653 version = 1 654 ADAPTER_ACTION_SLEEP_SECS = 1 655 ADAPTER_PAIRING_TIMEOUT_SECS = 60 656 ADAPTER_CONNECTION_TIMEOUT_SECS = 30 657 # Wait after connect for input device to be ready for use 658 ADAPTER_HID_INPUT_DELAY = 5 659 ADAPTER_DISCONNECTION_TIMEOUT_SECS = 30 660 ADAPTER_PAIRING_POLLING_SLEEP_SECS = 3 661 ADAPTER_DISCOVER_TIMEOUT_SECS = 60 # 30 seconds too short sometimes 662 ADAPTER_DISCOVER_POLLING_SLEEP_SECS = 1 663 ADAPTER_DISCOVER_NAME_TIMEOUT_SECS = 30 664 ADAPTER_WAKE_ENABLE_TIMEOUT_SECS = 30 665 666 ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS = 10 667 ADAPTER_POLLING_DEFAULT_SLEEP_SECS = 1 668 669 HID_REPORT_SLEEP_SECS = 1 670 671 672 DEFAULT_START_DELAY_SECS = 0 673 DEFAULT_HOLD_INTERVAL_SECS = 10 674 DEFAULT_HOLD_TIMEOUT_SECS = 60 675 DEFAULT_HOLD_SLEEP_SECS = 1 676 677 # Default suspend time in seconds for suspend resume. 678 SUSPEND_TIME_SECS=10 679 SUSPEND_ENTER_SECS=10 680 RESUME_TIME_SECS=30 681 RESUME_INTERNAL_TIMEOUT_SECS = 180 682 683 # Minimum RSSI required for peer devices during testing 684 MIN_RSSI = -70 685 686 # hci0 is the default hci device if there is no external bluetooth dongle. 687 EXPECTED_HCI = 'hci0' 688 689 CLASS_OF_SERVICE_MASK = 0xFFE000 690 CLASS_OF_DEVICE_MASK = 0x001FFF 691 692 # Constants about advertising. 693 DAFAULT_MIN_ADVERTISEMENT_INTERVAL_MS = 181.25 694 DAFAULT_MAX_ADVERTISEMENT_INTERVAL_MS = 181.25 695 ADVERTISING_INTERVAL_UNIT = 0.625 696 697 # Error messages about advertising dbus methods. 698 ERROR_FAILED_TO_REGISTER_ADVERTISEMENT = ( 699 'org.bluez.Error.NotPermitted: Maximum advertisements reached') 700 ERROR_INVALID_ADVERTISING_INTERVALS = ( 701 'org.bluez.Error.InvalidArguments: Invalid arguments') 702 703 # Supported profiles by chrome os. 704 SUPPORTED_UUIDS = { 705 'GATT_UUID': '00001801-0000-1000-8000-00805f9b34fb', 706 'A2DP_SOURCE_UUID': '0000110a-0000-1000-8000-00805f9b34fb', 707 'HFP_AG_UUID': '0000111f-0000-1000-8000-00805f9b34fb', 708 'PNP_UUID': '00001200-0000-1000-8000-00805f9b34fb', 709 'GAP_UUID': '00001800-0000-1000-8000-00805f9b34fb'} 710 711 # Board list for name/ID test check. These devices don't need to be tested 712 REFERENCE_BOARDS = [ 713 'rambi', 'nyan', 'oak', 'reef', 'yorp', 'bip', 'volteer', 714 'volteer2' 715 ] 716 717 # Path for btmon logs 718 BTMON_DIR_LOG_PATH = '/var/log/btmon' 719 720 # Path for usbmon logs 721 USBMON_DIR_LOG_PATH = '/var/log/usbmon' 722 723 # The agent capability of various device types. 724 AGENT_CAPABILITY = { 725 'BLUETOOTH_AUDIO': 'NoInputNoOutput', 726 } 727 728 def assert_on_fail(self, result, raiseNA=False): 729 """ If the called function returns a false-like value, raise an error. 730 731 Call test methods (i.e. with @test_retry_and_log) wrapped with this 732 function and failures will raise instead of continuing the test. 733 734 For example: 735 self.assert_on_fail(self.test_pairing(...)) 736 737 @param result: Result of test method called. 738 @param raiseNA: Whether to raise TestNAError instead of TestFail 739 740 @raises error.TestNAError 741 @raises error.TestFail 742 """ 743 if not result: 744 failure_msg = 'Assert on fail: {}'.format(self.last_test_method) 745 logging.error(failure_msg) 746 if raiseNA: 747 raise error.TestNAError(failure_msg) 748 else: 749 raise error.TestFail(failure_msg) 750 751 752 # TODO(b/131170539) remove when sarien/arcada no longer have _signed 753 # postfix 754 def get_base_platform_name(self): 755 """Returns the DUT platform name 756 757 If the DUT is a DVT device, _signed or _unsigned may be appended 758 to the device name, which we should ignore in our BT tests 759 760 @returns: String name of the DUT's platform with _signed or 761 _unsigned removed 762 """ 763 764 platform = self.host.get_platform() 765 766 return platform.replace('_signed', '').replace('_unsigned', '') 767 768 769 def group_btpeers_type(self): 770 """Group all Bluetooth peers by the type of their detected device.""" 771 772 # Use previously created btpeer_group instead of creating new 773 if len(self.btpeer_group_copy) > 0: 774 logging.info('Using previously created btpeer group') 775 for device_type in SUPPORTED_DEVICE_TYPES: 776 self.btpeer_group[device_type] = \ 777 self.btpeer_group_copy[device_type][:] 778 return 779 780 # Create new btpeer_group 781 for device_type in SUPPORTED_DEVICE_TYPES: 782 self.btpeer_group[device_type] = list() 783 # Create copy of btpeer_group 784 self.btpeer_group_copy[device_type] = list() 785 786 for idx, btpeer in enumerate(self.host.btpeer_list): 787 for device_type,gen_device_func in SUPPORTED_DEVICE_TYPES.items(): 788 try: 789 device = gen_device_func(btpeer)() 790 if device.CheckSerialConnection(): 791 self.btpeer_group[device_type].append(btpeer) 792 logging.info('%d-th btpeer find device %s', \ 793 idx, device_type) 794 # Create copy of btpeer_group 795 self.btpeer_group_copy[device_type].append(btpeer) 796 except: 797 logging.debug('Error with initializing %s on %d-th' 798 'btpeer', device_type, idx) 799 if len(self.btpeer_group[device_type]) == 0: 800 logging.error('No device is detected on %d-th btpeer', idx) 801 802 logging.debug("self.bt_group is %s",self.btpeer_group) 803 804 805 def wait_for_device(self, device, timeout=10): 806 """Waits for device to become available again 807 808 We reset raspberry pi peer between tests. This method helps us wait to 809 prevent us from trying to use the device before it comes back up again. 810 811 @param device: proxy object of peripheral device 812 """ 813 814 def is_device_ready(): 815 """Tries to use a service of the device 816 817 @returns: True if device is available to provide service 818 False otherwise 819 """ 820 821 try: 822 # Call a simple (fast) function to determine if device is online 823 # and reachable. If we can query this property, we know the 824 # device is available for us to use 825 getattr(device, 'GetCapabilities')() 826 827 except Exception as e: 828 return False 829 830 return True 831 832 833 try: 834 utils.poll_for_condition(condition=is_device_ready, 835 desc='wait_for_device', 836 timeout=timeout) 837 838 except utils.TimeoutError as e: 839 raise error.TestError('Peer is not available after waiting') 840 841 842 def clear_raspi_device(self, device): 843 """Clears a device on a raspi peer by resetting bluetooth stack 844 845 @param device: proxy object of peripheral device 846 """ 847 848 try: 849 device.ResetStack() 850 851 except socket.error as e: 852 # Ignore conn reset, expected during stack reset 853 if e.errno != errno.ECONNRESET: 854 raise 855 856 except chameleon.ChameleonConnectionError as e: 857 # Ignore chameleon conn reset, expected during stack reset 858 if str(errno.ECONNRESET) not in str(e): 859 raise 860 861 except six.moves.http_client.BadStatusLine as e: 862 # BadStatusLine occurs occasionally when chameleon 863 # is restarted. We ignore it here 864 logging.error('Ignoring badstatusline exception') 865 pass 866 867 # Catch generic Fault exception by rpc server, ignore 868 # method not available as it indicates platform didn't 869 # support method and that's ok 870 except Exception as e: 871 if not (e.__class__.__name__ == 'Fault' and 872 'is not supported' in str(e)): 873 raise 874 875 # Ensure device is back online before continuing 876 self.wait_for_device(device, timeout=30) 877 878 def device_set_powered(self, device, powered): 879 """Set raspi BT powered state. 880 881 @param powered: Powered state to set on Raspi. 882 """ 883 if powered: 884 device.AdapterPowerOn() 885 else: 886 device.AdapterPowerOff() 887 888 def get_device_rasp(self, device_num, on_start=True): 889 """Get all bluetooth device objects from Bluetooth peer devices 890 This method should be called only after group_btpeers_type 891 @param device_num : dict of {device_type:number}, to specify the number 892 of device needed for each device_type. 893 894 @param on_start: boolean describing whether the requested clear is for a 895 new test, or in the middle of a current one 896 897 @returns: True if Success. 898 """ 899 900 logging.info("in get_device_rasp %s onstart %s", device_num, on_start) 901 total_num_devices = sum(device_num.values()) 902 if total_num_devices > len(self.host.btpeer_list): 903 logging.error( 904 'Total number of devices %s is greater than the' 905 ' number of Bluetooth peers %s', total_num_devices, 906 len(self.host.btpeer_list)) 907 return False 908 909 for device_type, number in device_num.items(): 910 total_num_devices += number 911 if len(self.btpeer_group[device_type]) < number: 912 logging.error('Number of Bluetooth peers with device type' 913 '%s is %d, which is less then needed %d', device_type, 914 len(self.btpeer_group[device_type]), number) 915 return False 916 917 for btpeer in self.btpeer_group[device_type][:number]: 918 logging.info("getting emulated %s", device_type) 919 device = self.reset_device(btpeer, device_type, on_start) 920 921 self.devices[device_type].append(device) 922 923 # Remove this btpeer from btpeer_group since it is already 924 # configured as a specific device 925 for temp_device in SUPPORTED_DEVICE_TYPES: 926 if btpeer in self.btpeer_group[temp_device]: 927 self.btpeer_group[temp_device].remove(btpeer) 928 929 return True 930 931 932 def get_device(self, device_type, on_start=True): 933 """Get the bluetooth device object. 934 935 @param device_type : the bluetooth device type, e.g., 'MOUSE' 936 937 @param on_start: boolean describing whether the requested clear is for a 938 new test, or in the middle of a current one 939 940 @returns: the bluetooth device object 941 942 """ 943 944 self.devices[device_type].append( 945 self.reset_device(self.host.btpeer, device_type, on_start)) 946 947 return self.devices[device_type][-1] 948 949 950 def reset_device(self, peer, device_type, clear_device=True): 951 """Reset the peer device in order to be used as a different type. 952 953 @param peer: the peer device to reset with new device type 954 @param device_type : the new bluetooth device type, e.g., 'MOUSE' 955 @param clear_device: whether to clear the device state 956 957 @returns: the bluetooth device object 958 959 """ 960 device = get_bluetooth_emulated_device(peer, device_type) 961 962 # Re-fresh device to clean state if test is starting 963 if clear_device: 964 self.clear_raspi_device(device) 965 966 try: 967 # Tell generic chameleon to bind to this device type 968 device.SpecifyDeviceType(device_type) 969 970 # Catch generic Fault exception by rpc server, ignore method not 971 # available as it indicates platform didn't support method and that's 972 # ok 973 except Exception as e: 974 if not (e.__class__.__name__ == 'Fault' and 975 'is not supported' in str(e)): 976 logging.error("got exception %s", str(e)) 977 raise 978 979 return device 980 981 982 def is_device_available(self, btpeer, device_type): 983 """Determines if the named device is available on the linked peer 984 985 @param device_type: the bluetooth HID device type, e.g., 'MOUSE' 986 987 @returns: True if it is able to resolve the device, false otherwise 988 """ 989 990 device = SUPPORTED_DEVICE_TYPES[device_type](btpeer)() 991 try: 992 # The proxy prevents us from checking if the object is None directly 993 # so instead we call a fast method that any peripheral must support. 994 # This will fail if the object over the proxy doesn't exist 995 getattr(device, 'GetCapabilities')() 996 997 except Exception as e: 998 return False 999 1000 return True 1001 1002 1003 def list_devices_available(self): 1004 """Queries which devices are available on btpeer(s) 1005 1006 @returns: dict mapping HID device types to number of supporting peers 1007 available, e.g. {'MOUSE':1, 'KEYBOARD':1} 1008 """ 1009 devices_available = {} 1010 for device_type in SUPPORTED_DEVICE_TYPES: 1011 for btpeer in self.host.btpeer_list: 1012 if self.is_device_available(btpeer, device_type): 1013 devices_available[device_type] = \ 1014 devices_available.get(device_type, 0) + 1 1015 1016 logging.debug("devices available are %s", devices_available) 1017 return devices_available 1018 1019 1020 def suspend_resume(self, suspend_time=SUSPEND_TIME_SECS): 1021 """Suspend the DUT for a while and then resume. 1022 1023 @param suspend_time: the suspend time in secs 1024 @raises errors.TestFail if the device reboots during suspend 1025 1026 """ 1027 boot_id = self.host.get_boot_id() 1028 suspend = self.suspend_async(suspend_time=suspend_time) 1029 start_time = self.bluetooth_facade.get_device_time() 1030 1031 # Give the system some time to enter suspend 1032 self.test_suspend_and_wait_for_sleep( 1033 suspend, sleep_timeout=self.SUSPEND_ENTER_SECS) 1034 1035 # Wait for resume - since we're not testing suspend itself, we are 1036 # lenient with the resume time here 1037 self.test_wait_for_resume(boot_id, 1038 suspend, 1039 resume_timeout=suspend_time, 1040 test_start_time=start_time) 1041 1042 1043 def reboot(self): 1044 """Reboot the DUT and recreate necessary processes and variables""" 1045 self.host.reboot() 1046 1047 # We need to recreate the bluetooth_facade after a reboot. 1048 # Delete the proxy first so it won't delete the old one, which 1049 # invokes disconnection, after creating the new one. 1050 if hasattr(self, 'factory'): 1051 del self.factory 1052 if hasattr(self, 'bluetooth_facade'): 1053 del self.bluetooth_facade 1054 if hasattr(self, 'input_facade'): 1055 del self.input_facade 1056 self.factory = remote_facade_factory.RemoteFacadeFactory( 1057 self.host, disable_arc=True, no_chrome=not self.start_browser) 1058 self.bluetooth_facade = self.factory.create_bluetooth_facade() 1059 self.input_facade = self.factory.create_input_facade() 1060 1061 # Re-enable debugging verbose since Chrome will set it to 1062 # default(disable). 1063 self.enable_disable_debug_log(enable=True) 1064 1065 # Re-disable cellular 1066 self.enable_disable_cellular(enable=False) 1067 1068 # Re-disable ui 1069 self.enable_disable_ui(enable=False) 1070 1071 self.start_new_btmon() 1072 self.start_new_usbmon() 1073 1074 1075 def _wait_till_condition_holds(self, func, method_name, 1076 timeout=DEFAULT_HOLD_TIMEOUT_SECS, 1077 sleep_interval=DEFAULT_HOLD_SLEEP_SECS, 1078 hold_interval=DEFAULT_HOLD_INTERVAL_SECS, 1079 start_delay=DEFAULT_START_DELAY_SECS): 1080 """ Wait for the func() to hold true for a period of time 1081 1082 1083 @param func: the function to wait for. 1084 @param method_name: the invoking class method. 1085 @param timeout: number of seconds to wait before giving up. 1086 @param sleep_interval: the interval in seconds to sleep between 1087 invoking func(). 1088 @param hold_interval: the interval in seconds for the condition to 1089 remain true 1090 @param start_delay: interval in seconds to wait before starting 1091 1092 @returns: True if the condition is met, 1093 False otherwise 1094 1095 """ 1096 if start_delay > 0: 1097 logging.debug('waiting for %s secs before checking %s',start_delay, 1098 method_name) 1099 time.sleep(start_delay) 1100 1101 try: 1102 utils.poll_till_condition_holds(condition=func, 1103 timeout=timeout, 1104 sleep_interval=sleep_interval, 1105 hold_interval = hold_interval, 1106 desc=('Waiting %s' % method_name)) 1107 return True 1108 except utils.TimeoutError as e: 1109 logging.error('%s: %s', method_name, e) 1110 except Exception as e: 1111 logging.error('%s: %s', method_name, e) 1112 err = 'bluetoothd possibly crashed. Check out /var/log/messages.' 1113 logging.error(err) 1114 except: 1115 logging.error('%s: unexpected error', method_name) 1116 return False 1117 1118 1119 def _wait_for_condition(self, func, method_name, 1120 timeout=ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS, 1121 sleep_interval=ADAPTER_POLLING_DEFAULT_SLEEP_SECS, 1122 start_delay=DEFAULT_START_DELAY_SECS): 1123 """Wait for the func() to become True. 1124 1125 @param func: the function to wait for. 1126 @param method_name: the invoking class method. 1127 @param timeout: number of seconds to wait before giving up. 1128 @param sleep_interval: the interval in seconds to sleep between 1129 invoking func(). 1130 @param start_delay: interval in seconds to wait before starting 1131 1132 @returns: True if the condition is met, 1133 False otherwise 1134 1135 """ 1136 1137 if start_delay > 0: 1138 logging.debug('waiting for %s secs before checking %s',start_delay, 1139 method_name) 1140 time.sleep(start_delay) 1141 1142 try: 1143 utils.poll_for_condition(condition=func, 1144 timeout=timeout, 1145 sleep_interval=sleep_interval, 1146 desc=('Waiting %s' % method_name)) 1147 return True 1148 except utils.TimeoutError as e: 1149 logging.error('%s: %s', method_name, e) 1150 except Exception as e: 1151 logging.error('%s: %s', method_name, e) 1152 err = 'bluetoothd possibly crashed. Check out /var/log/messages.' 1153 logging.error(err) 1154 except: 1155 logging.error('%s: unexpected error', method_name) 1156 return False 1157 1158 def ignore_failure(instance, test_method, *args, **kwargs): 1159 """ Wrapper to prevent a test_method failure from failing the test batch 1160 1161 Sometimes a test method needs to be used as a normal function, for its 1162 result. This wrapper prevent test_method failure being recorded in 1163 instance.fails and causing a failure of the quick test batch. 1164 1165 @param test_method: test_method 1166 @returns: result of the test_method 1167 """ 1168 1169 original_fails = instance.fails[:] 1170 test_result = test_method(*args, **kwargs) 1171 if not test_result: 1172 logging.info("%s failure is ignored",test_method.__name__) 1173 instance.fails = original_fails 1174 return test_result 1175 1176 1177 def start_agent(self, device): 1178 """Start the pairing agent of the device if applicable. 1179 1180 @param device: the peer device 1181 """ 1182 dev_type = device.GetDeviceType() 1183 capability = self.AGENT_CAPABILITY.get(dev_type) 1184 if capability: 1185 device.StartPairingAgent(capability) 1186 1187 1188 def stop_agent(self, device): 1189 """Stop the pairing agent of the device if applicable. 1190 1191 @param device: the peer device 1192 """ 1193 dev_type = device.GetDeviceType() 1194 capability = self.AGENT_CAPABILITY.get(dev_type) 1195 if capability: 1196 device.StopPairingAgent() 1197 1198 1199 # ------------------------------------------------------------------- 1200 # Adater standalone tests 1201 # ------------------------------------------------------------------- 1202 1203 1204 def service_exists(self, service_name): 1205 """Checks if a service exists on the DUT 1206 1207 @param service_name: name of the service 1208 1209 @returns: True if service status can be queried, else False 1210 """ 1211 1212 status_cmd = 'initctl status {}'.format(service_name) 1213 try: 1214 # Querying the status of a non-existent service throws an 1215 # AutoservRunError exception. If no exception is thrown, we know 1216 # the service exists 1217 self.host.run(status_cmd) 1218 1219 except error.AutoservRunError: 1220 return False 1221 1222 return True 1223 1224 1225 def service_enabled(self, service_name): 1226 """Checks if a service is running on the DUT 1227 1228 @param service_name: name of the service 1229 1230 @throws: AutoservRunError is thrown if there is no service with the 1231 provided name installed on the DUT. 1232 1233 @returns: True if service is currently running, else False 1234 """ 1235 1236 status_cmd = 'initctl status {}'.format(service_name) 1237 output = self.host.run(status_cmd).stdout 1238 1239 return 'start/running' in output 1240 1241 1242 def _initctl_services(self, services, command): 1243 """Use initctl to control service on the DUT 1244 1245 @param services: list of string service names 1246 @param command: initctl command on the services 1247 'start': to start the service 1248 'stop': to stop the service 1249 'restart': to restart the service 1250 1251 @returns: True if services were set successfully, else False 1252 """ 1253 for service in services: 1254 # Some platforms will not support all services. In these cases, 1255 # no need to fail, since they won't interfere with our tests 1256 if not self.service_exists(service): 1257 logging.debug('Service %s does not exist on DUT', service) 1258 continue 1259 1260 # A sample call to enable or disable a service is as follows: 1261 # "initctl stop modemfwd" 1262 if command in ['start', 'stop']: 1263 enable = command == 'start' 1264 if self.service_enabled(service) != enable: 1265 self.host.run('initctl {} {}'.format(command, service)) 1266 1267 if self.service_enabled(service) != enable: 1268 logging.error('Failed to set initctl service to state %d', 1269 enable) 1270 return False 1271 1272 if enable: 1273 logging.info('Service {} enabled'.format(service)) 1274 else: 1275 logging.info('Service {} disabled'.format(service)) 1276 1277 elif command == 'restart': 1278 if self.service_enabled(service): 1279 self.host.run('initctl {} {}'.format(command, service)) 1280 else: 1281 # Just start a stopped job. 1282 self.host.run('initctl {} {}'.format('start', service)) 1283 logging.info('Service {} restarted'.format(service)) 1284 1285 else: 1286 logging.error('unknown command {} on services {}'.format( 1287 command, services)) 1288 return False 1289 1290 return True 1291 1292 1293 def enable_disable_services(self, services, enable): 1294 """Enable or disable service on the DUT 1295 1296 @param services: list of string service names 1297 @param enable: True to enable services, False to disable 1298 1299 @returns: True if services were set successfully, else False 1300 """ 1301 command = 'start' if enable else 'stop' 1302 return self._initctl_services(services, command) 1303 1304 1305 def enable_disable_cellular(self, enable): 1306 """Enable cellular services on the DUT 1307 1308 @param enable: True to enable cellular services 1309 False to disable cellular services 1310 1311 @returns: True if services were set successfully, else False 1312 """ 1313 cellular_services = ['modemmanager', 'modemfwd'] 1314 1315 return self.enable_disable_services(cellular_services, enable) 1316 1317 1318 def enable_disable_ui(self, enable): 1319 """Enable UI service on the DUT 1320 1321 @param enable: True to enable UI services 1322 False to disable UI services 1323 1324 @returns: True if services were set successfully, else False 1325 """ 1326 ui_services = ['ui'] 1327 1328 return self.enable_disable_services(ui_services, enable) 1329 1330 1331 def restart_services(self, services): 1332 """Restart a service on the DUT 1333 1334 @param services: the services, e.g., ['cras',] 1335 1336 @returns: True if services were set successfully, else False 1337 """ 1338 return self._initctl_services(services, 'restart') 1339 1340 1341 def restart_cras(self): 1342 """Restart the cras service on the DUT 1343 1344 @returns: True if cras was restart successfully, else False 1345 """ 1346 return self.restart_services(['cras', ]) 1347 1348 1349 def enable_disable_debug_log(self, enable): 1350 """Enable or disable debug log in DUT 1351 @param enable: True to enable all of the debug log, 1352 False to disable all of the debug log. 1353 """ 1354 level = int(enable) 1355 self.bluetooth_facade.set_debug_log_levels(level, level, level, level) 1356 1357 1358 def start_new_btmon(self): 1359 """ Start a new btmon process and save the log """ 1360 1361 # Kill all btmon process before creating a new one 1362 self.host.run('pkill btmon || true') 1363 1364 # Make sure the directory exists 1365 self.host.run('mkdir -p %s' % self.BTMON_DIR_LOG_PATH) 1366 1367 # Time format. Ex, 2020_02_20_17_52_45 1368 now = time.strftime("%Y_%m_%d_%H_%M_%S") 1369 file_name = 'btsnoop_%s' % now 1370 self.host.run_background('btmon -SAw %s/%s' % (self.BTMON_DIR_LOG_PATH, 1371 file_name)) 1372 1373 def start_new_usbmon(self): 1374 """ Start a new USBMON process and save the log """ 1375 1376 # Kill all usbmon process before creating a new one 1377 self.host.run('pkill tcpdump || true') 1378 1379 # Make sure the directory exists 1380 self.host.run('mkdir -p %s' % self.USBMON_DIR_LOG_PATH) 1381 1382 # Time format. Ex, 2020_02_20_17_52_45 1383 now = time.strftime("%Y_%m_%d_%H_%M_%S") 1384 file_name = 'usbmon_%s' % now 1385 self.host.run_background('tcpdump -i usbmon0 -w %s/%s' % 1386 (self.USBMON_DIR_LOG_PATH, file_name)) 1387 1388 1389 def log_message(self, msg): 1390 """ Write a string to log.""" 1391 self.bluetooth_facade.log_message(msg) 1392 1393 def is_wrt_supported(self): 1394 """ Check if Bluetooth adapter support WRT logs. """ 1395 return self.bluetooth_facade.is_wrt_supported() 1396 1397 def enable_wrt_logs(self): 1398 """ Enable WRT logs from Intel Adapters.""" 1399 return self.bluetooth_facade.enable_wrt_logs() 1400 1401 def collect_wrt_logs(self): 1402 """ Collect WRT logs from Intel Adapters.""" 1403 return self.bluetooth_facade.collect_wrt_logs() 1404 1405 def get_num_connected_devices(self): 1406 """ Return number of remote devices currently connected to the DUT. 1407 1408 @returns: The number of devices known to bluez with the Connected 1409 property active 1410 """ 1411 1412 num_connected_devices = 0 1413 for dev in self.bluetooth_facade.get_devices(): 1414 if dev and dev.get('Connected', 0): 1415 num_connected_devices += 1 1416 1417 return num_connected_devices 1418 1419 @test_retry_and_log 1420 def test_bluetoothd_running(self): 1421 """Test that bluetoothd is running.""" 1422 return self.bluetooth_facade.is_bluetoothd_running() 1423 1424 1425 @test_retry_and_log 1426 def test_start_bluetoothd(self): 1427 """Test that bluetoothd could be started successfully.""" 1428 return self.bluetooth_facade.start_bluetoothd() 1429 1430 1431 @test_retry_and_log 1432 def test_stop_bluetoothd(self): 1433 """Test that bluetoothd could be stopped successfully.""" 1434 return self.bluetooth_facade.stop_bluetoothd() 1435 1436 1437 @test_retry_and_log 1438 def test_has_adapter(self): 1439 """Verify that there is an adapter. This will return True only if both 1440 the kernel and bluetooth daemon see the adapter. 1441 """ 1442 return self.bluetooth_facade.has_adapter() 1443 1444 @test_retry_and_log 1445 def test_adapter_work_state(self): 1446 """Test that the bluetooth adapter is in the correct working state. 1447 1448 This includes that the adapter is detectable, is powered on, 1449 and its hci device is hci0. 1450 """ 1451 has_adapter = self.bluetooth_facade.has_adapter() 1452 is_powered_on = self._wait_for_condition( 1453 self.bluetooth_facade.is_powered_on, method_name()) 1454 hci = self.bluetooth_facade.get_hci() == self.EXPECTED_HCI 1455 self.results = { 1456 'has_adapter': has_adapter, 1457 'is_powered_on': is_powered_on, 1458 'hci': hci} 1459 return all(self.results.values()) 1460 1461 @test_retry_and_log(False) 1462 def test_adapter_wake_enabled(self): 1463 """Test that the bluetooth adapter is wakeup enabled. 1464 """ 1465 wake_enabled = self._wait_for_condition( 1466 self.bluetooth_facade.is_wake_enabled, method_name(), 1467 timeout=self.ADAPTER_WAKE_ENABLE_TIMEOUT_SECS) 1468 1469 self.results = { 'wake_enabled': wake_enabled } 1470 return any(self.results.values()) 1471 1472 @test_retry_and_log(False) 1473 def test_device_wake_allowed(self, device_address): 1474 """Test that given device can wake the system.""" 1475 self.results = { 1476 'Wake allowed': 1477 self.bluetooth_facade.get_device_property( 1478 device_address, 'WakeAllowed') 1479 } 1480 1481 return all(self.results.values()) 1482 1483 @test_retry_and_log(False) 1484 def test_device_wake_not_allowed(self, device_address): 1485 """Test that given device cannot wake the system.""" 1486 self.results = { 1487 'Wake not allowed': 1488 not self.bluetooth_facade.get_device_property( 1489 device_address, 'WakeAllowed') 1490 } 1491 1492 return all(self.results.values()) 1493 1494 @test_retry_and_log(False) 1495 def test_adapter_set_wake_disabled(self): 1496 """Disable wake and verify it was written. """ 1497 success = self.bluetooth_facade.set_wake_enabled(False) 1498 self.results = { 'disable_wake': success } 1499 return all(self.results.values()) 1500 1501 @test_retry_and_log 1502 def test_power_on_adapter(self): 1503 """Test that the adapter could be powered on successfully.""" 1504 power_on = self.bluetooth_facade.set_powered(True) 1505 is_powered_on = self._wait_for_condition( 1506 self.bluetooth_facade.is_powered_on, method_name()) 1507 1508 self.results = {'power_on': power_on, 'is_powered_on': is_powered_on} 1509 return all(self.results.values()) 1510 1511 1512 @test_retry_and_log 1513 def test_power_off_adapter(self): 1514 """Test that the adapter could be powered off successfully.""" 1515 power_off = self.bluetooth_facade.set_powered(False) 1516 is_powered_off = self._wait_for_condition( 1517 lambda: not self.bluetooth_facade.is_powered_on(), 1518 method_name()) 1519 1520 self.results = { 1521 'power_off': power_off, 1522 'is_powered_off': is_powered_off} 1523 return all(self.results.values()) 1524 1525 1526 @test_retry_and_log 1527 def test_reset_on_adapter(self): 1528 """Test that the adapter could be reset on successfully. 1529 1530 This includes restarting bluetoothd, and removing the settings 1531 and cached devices. 1532 """ 1533 reset_on = self.bluetooth_facade.reset_on() 1534 is_powered_on = self._wait_for_condition( 1535 self.bluetooth_facade.is_powered_on, method_name()) 1536 1537 self.results = {'reset_on': reset_on, 'is_powered_on': is_powered_on} 1538 return all(self.results.values()) 1539 1540 1541 @test_retry_and_log 1542 def test_reset_off_adapter(self): 1543 """Test that the adapter could be reset off successfully. 1544 1545 This includes restarting bluetoothd, and removing the settings 1546 and cached devices. 1547 """ 1548 reset_off = self.bluetooth_facade.reset_off() 1549 is_powered_off = self._wait_for_condition( 1550 lambda: not self.bluetooth_facade.is_powered_on(), 1551 method_name()) 1552 1553 self.results = { 1554 'reset_off': reset_off, 1555 'is_powered_off': is_powered_off} 1556 return all(self.results.values()) 1557 1558 1559 def test_is_powered_off(self): 1560 """Check if the adapter is powered off.""" 1561 is_powered_off = not self.bluetooth_facade.is_powered_on() 1562 self.results = {'is_powered_off': is_powered_off} 1563 return all(self.results.values()) 1564 1565 1566 @test_retry_and_log(False) 1567 def test_is_facade_valid(self): 1568 """Checks whether the bluetooth facade is in a good state. 1569 1570 If bluetoothd restarts (i.e. due to a crash), the object proxies will no 1571 longer be valid (because the session will be closed). Check whether the 1572 session failed and wait for a new session if it did. 1573 """ 1574 initially_ok = self.bluetooth_facade.is_bluetoothd_valid() 1575 bluez_started = initially_ok or self.bluetooth_facade.start_bluetoothd() 1576 1577 self.results = { 1578 'initially_ok': initially_ok, 1579 'bluez_started': bluez_started 1580 } 1581 return all(self.results.values()) 1582 1583 1584 @test_retry_and_log(False) 1585 def test_is_adapter_valid(self): 1586 """Verify the bluetooth adapter is retrievable at test start 1587 1588 @raises: error.TestNAError if we fail to retrieve the adapter on 1589 an unsupported chipset 1590 error.TestFail if we fail to retrieve the adapter on any other 1591 platform 1592 1593 @returns: True if the adapter was located properly 1594 """ 1595 1596 if not self.bluetooth_facade.has_adapter(): 1597 logging.error('No adapter available, rebooting to recover') 1598 1599 self.reboot() 1600 1601 chipset = self.get_chipset_name() 1602 1603 if not chipset: 1604 raise error.TestFail('Unknown adapter is missing') 1605 1606 # A missing adapter is a rare but known issue on several platforms 1607 # that have no vendor support (b/169328792). Since there is no fix 1608 # possible, we forgive these failures by raising a TestNA. 1609 if chipset in UNSUPPORTED_CHIPSETS: 1610 raise error.TestNAError('Unsupported adapter is missing') 1611 1612 raise error.TestFail('Adapter is missing') 1613 1614 return True 1615 1616 @test_retry_and_log 1617 def test_UUIDs(self): 1618 """Test that basic profiles are supported.""" 1619 adapter_UUIDs = self.bluetooth_facade.get_UUIDs() 1620 self.results = [uuid for uuid in self.SUPPORTED_UUIDS.values() 1621 if uuid not in adapter_UUIDs] 1622 return not bool(self.results) 1623 1624 1625 @test_retry_and_log 1626 def test_start_discovery(self): 1627 """Test that the adapter could start discovery.""" 1628 start_discovery, _ = self.bluetooth_facade.start_discovery() 1629 is_discovering = self._wait_for_condition( 1630 self.bluetooth_facade.is_discovering, method_name()) 1631 1632 self.results = { 1633 'start_discovery': start_discovery, 1634 'is_discovering': is_discovering} 1635 return all(self.results.values()) 1636 1637 @test_retry_and_log(False) 1638 def test_is_discovering(self): 1639 """Test that the adapter is already discovering.""" 1640 is_discovering = self._wait_for_condition( 1641 self.bluetooth_facade.is_discovering, method_name()) 1642 1643 self.results = {'is_discovering': is_discovering} 1644 return all(self.results.values()) 1645 1646 @test_retry_and_log 1647 def test_stop_discovery(self): 1648 """Test that the adapter could stop discovery.""" 1649 if not self.bluetooth_facade.is_discovering(): 1650 return True 1651 1652 stop_discovery, _ = self.bluetooth_facade.stop_discovery() 1653 is_not_discovering = self._wait_for_condition( 1654 lambda: not self.bluetooth_facade.is_discovering(), 1655 method_name()) 1656 1657 self.results = { 1658 'stop_discovery': stop_discovery, 1659 'is_not_discovering': is_not_discovering} 1660 return all(self.results.values()) 1661 1662 1663 @test_retry_and_log 1664 def test_discoverable(self): 1665 """Test that the adapter could be set discoverable.""" 1666 set_discoverable = self.bluetooth_facade.set_discoverable(True) 1667 is_discoverable = self._wait_for_condition( 1668 self.bluetooth_facade.is_discoverable, method_name()) 1669 1670 self.results = { 1671 'set_discoverable': set_discoverable, 1672 'is_discoverable': is_discoverable} 1673 return all(self.results.values()) 1674 1675 @test_retry_and_log(False) 1676 def test_is_discoverable(self): 1677 """Test that the adapter is discoverable.""" 1678 is_discoverable = self._wait_for_condition( 1679 self.bluetooth_facade.is_discoverable, method_name()) 1680 1681 self.results = {'is_discoverable': is_discoverable} 1682 return all(self.results.values()) 1683 1684 1685 def _test_timeout_property(self, set_property, check_property, set_timeout, 1686 get_timeout, property_name, 1687 timeout_values = [0, 60, 180]): 1688 """Common method to test (Discoverable/Pairable)Timeout . 1689 1690 This is used to test 1691 - DiscoverableTimeout property 1692 - PairableTimeout property 1693 1694 The test performs the following 1695 - Set PropertyTimeout 1696 - Read PropertyTimeout and make sure values match 1697 - Set adapter propety 1698 - In a loop check if property is active 1699 - Test fails property is false before timeout 1700 - Test fails property is True after timeout 1701 Repeat the test for different values for timeout 1702 1703 Note : Value of 0 mean it never timeouts, so the test will 1704 end after 30 seconds. 1705 """ 1706 def check_timeout(timeout): 1707 """Check for timeout value in loop while recording failures.""" 1708 actual_timeout = get_timeout() 1709 if timeout != actual_timeout: 1710 logging.debug('%s timeout value read %s does not ' 1711 'match value set %s, yet', property_name, 1712 actual_timeout, timeout) 1713 return False 1714 else: 1715 return True 1716 1717 def _test_timeout_property(timeout): 1718 # minium time after timeout before checking property 1719 MIN_DELTA_SECS = 3 1720 # Time between checking property 1721 WAIT_TIME_SECS = 2 1722 1723 # Set and read back the timeout value 1724 if not set_timeout(timeout): 1725 logging.error('Setting the %s timeout failed',property_name) 1726 return False 1727 1728 1729 if not self._wait_for_condition(lambda : check_timeout(timeout), 1730 'check_'+property_name): 1731 logging.error('checking %s_timeout value timed out', 1732 property_name) 1733 return False 1734 1735 # 1736 # Check that the timeout works 1737 # Check property is true until timeout 1738 # and then it is not 1739 1740 property_set = set_property(True) 1741 property_is_true = self._wait_for_condition(check_property, 1742 method_name()) 1743 1744 self.results = { 'set_%s' % property_name : property_set, 1745 'is_%s' % property_name: property_is_true} 1746 logging.debug(self.results) 1747 1748 if not all(self.results.values()): 1749 logging.error('Setting %s failed',property_name) 1750 return False 1751 1752 start_time = time.time() 1753 while True: 1754 time.sleep(WAIT_TIME_SECS) 1755 cur_time = time.time() 1756 property_set = check_property() 1757 time_elapsed = cur_time - start_time 1758 1759 # Ignore check_property results made near the timeout 1760 # to avoid spurious failures. 1761 if abs(int(timeout - time_elapsed)) < MIN_DELTA_SECS: 1762 continue 1763 1764 # Timeout of zero seconds mean that the adapter never times out 1765 # Check for 30 seconds and then exit the test. 1766 if timeout == 0: 1767 if not property_set: 1768 logging.error('Adapter is not %s after %.2f ' 1769 'secs with a timeout of zero ', 1770 property_name, time_elapsed) 1771 return False 1772 elif time_elapsed > 30: 1773 logging.debug('Adapter %s after %.2f seconds ' 1774 'with timeout of zero as expected' , 1775 property_name, time_elapsed) 1776 return True 1777 continue 1778 1779 # 1780 # Check if property is true till timeout ends and 1781 # false afterwards 1782 # 1783 if time_elapsed < timeout: 1784 if not property_set: 1785 logging.error('Adapter is not %s after %.2f ' 1786 'secs before timeout of %.2f', 1787 property_name, time_elapsed, timeout) 1788 return False 1789 else: 1790 if property_set: 1791 logging.error('Adapter is still %s after ' 1792 ' %.2f secs with timeout of %.2f', 1793 property_name, time_elapsed, timeout) 1794 return False 1795 else: 1796 logging.debug('Adapter not %s after %.2f ' 1797 'secs with timeout of %.2f as expected ', 1798 property_name, time_elapsed, timeout) 1799 return True 1800 1801 default_value = check_property() 1802 default_timeout = get_timeout() 1803 1804 result = [] 1805 try: 1806 for timeout in timeout_values: 1807 result.append(_test_timeout_property(timeout)) 1808 logging.debug("Test returning %s", all(result)) 1809 return all(result) 1810 except: 1811 logging.error("exception in test_%s_timeout",property_name) 1812 raise 1813 finally: 1814 # Set the property back to default value permanently before 1815 # exiting the test 1816 set_timeout(0) 1817 set_property(default_value) 1818 # Set the timeout back to default value before exiting the test 1819 set_timeout(default_timeout) 1820 1821 1822 @test_retry_and_log 1823 def test_discoverable_timeout(self, timeout_values = [0, 60, 180]): 1824 """Test adapter dbus property DiscoverableTimeout.""" 1825 return self._test_timeout_property( 1826 set_property = self.bluetooth_facade.set_discoverable, 1827 check_property = self.bluetooth_facade.is_discoverable, 1828 set_timeout = self.bluetooth_facade.set_discoverable_timeout, 1829 get_timeout = self.bluetooth_facade.get_discoverable_timeout, 1830 property_name = 'discoverable', 1831 timeout_values = timeout_values) 1832 1833 @test_retry_and_log 1834 def test_pairable_timeout(self, timeout_values = [0, 60, 180]): 1835 """Test adapter dbus property PairableTimeout.""" 1836 return self._test_timeout_property( 1837 set_property = self.bluetooth_facade.set_pairable, 1838 check_property = self.bluetooth_facade.is_pairable, 1839 set_timeout = self.bluetooth_facade.set_pairable_timeout, 1840 get_timeout = self.bluetooth_facade.get_pairable_timeout, 1841 property_name = 'pairable', 1842 timeout_values = timeout_values) 1843 1844 1845 @test_retry_and_log 1846 def test_nondiscoverable(self): 1847 """Test that the adapter could be set non-discoverable.""" 1848 set_nondiscoverable = self.bluetooth_facade.set_discoverable(False) 1849 is_nondiscoverable = self._wait_for_condition( 1850 lambda: not self.bluetooth_facade.is_discoverable(), 1851 method_name()) 1852 1853 self.results = { 1854 'set_nondiscoverable': set_nondiscoverable, 1855 'is_nondiscoverable': is_nondiscoverable} 1856 return all(self.results.values()) 1857 1858 1859 @test_retry_and_log 1860 def test_pairable(self): 1861 """Test that the adapter could be set pairable.""" 1862 set_pairable = self.bluetooth_facade.set_pairable(True) 1863 is_pairable = self._wait_for_condition( 1864 self.bluetooth_facade.is_pairable, method_name()) 1865 1866 self.results = { 1867 'set_pairable': set_pairable, 1868 'is_pairable': is_pairable} 1869 return all(self.results.values()) 1870 1871 1872 @test_retry_and_log 1873 def test_nonpairable(self): 1874 """Test that the adapter could be set non-pairable.""" 1875 set_nonpairable = self.bluetooth_facade.set_pairable(False) 1876 is_nonpairable = self._wait_for_condition( 1877 lambda: not self.bluetooth_facade.is_pairable(), method_name()) 1878 1879 self.results = { 1880 'set_nonpairable': set_nonpairable, 1881 'is_nonpairable': is_nonpairable} 1882 return all(self.results.values()) 1883 1884 1885 @test_retry_and_log(False) 1886 def test_check_valid_adapter_id(self): 1887 """Fail if the Bluetooth ID is not in the correct format. 1888 1889 @returns True if adapter ID follows expected format, False otherwise 1890 """ 1891 1892 device = self.get_base_platform_name() 1893 adapter_info = self.get_adapter_properties() 1894 1895 # Don't complete test if this is a reference board 1896 if device in self.REFERENCE_BOARDS: 1897 return True 1898 1899 modalias = adapter_info['Modalias'] 1900 logging.debug('Saw Bluetooth ID of: %s', modalias) 1901 1902 # Valid Device ID is: 1903 # <00E0(Google)>/<C405(Chrome OS)>/<non-zero versionNumber> 1904 bt_format = 'bluetooth:v00E0pC405d(?!0000)' 1905 1906 if not re.match(bt_format, modalias): 1907 return False 1908 1909 return True 1910 1911 1912 @test_retry_and_log(False) 1913 def test_check_valid_alias(self): 1914 """Fail if the Bluetooth alias is not in the correct format. 1915 1916 @returns True if adapter alias follows expected format, False otherwise 1917 """ 1918 1919 device = self.get_base_platform_name() 1920 adapter_info = self.get_adapter_properties() 1921 1922 # Don't complete test if this is a reference board 1923 if device in self.REFERENCE_BOARDS: 1924 return True 1925 1926 alias = adapter_info['Alias'] 1927 logging.debug('Saw Bluetooth Alias of: %s', alias) 1928 1929 device_type = self.host.get_board_type().lower() 1930 alias_format = '%s_[a-z0-9]{4}' % device_type 1931 1932 self.results = {} 1933 1934 alias_was_correct = True 1935 if not re.match(alias_format, alias.lower()): 1936 alias_was_correct = False 1937 logging.info('unexpected alias %s found', alias) 1938 self.results['alias_found'] = alias 1939 1940 self.results['alias_was_correct'] = alias_was_correct 1941 return all(self.results.values()) 1942 1943 1944 # ------------------------------------------------------------------- 1945 # Tests about general discovering, pairing, and connection 1946 # ------------------------------------------------------------------- 1947 1948 1949 @test_retry_and_log(False) 1950 def test_discover_device(self, 1951 device_address, 1952 start_discovery=True, 1953 stop_discovery=True): 1954 """Test that the adapter could discover the specified device address. 1955 1956 @param device_address: Address of the device. 1957 @param start_discovery: Whether to start discovery. Set to False if you 1958 call start_discovery before calling this. 1959 @param stop_discovery: Whether to stop discovery at the end. If this is 1960 set to False, make sure to call 1961 test_stop_discovery afterwards. 1962 1963 @returns: True if the device is found. False otherwise. 1964 1965 """ 1966 has_device_initially = False 1967 discovery_stopped = False 1968 is_not_discovering = False 1969 device_discovered = False 1970 # If start discovery is not set, discovery must already be started 1971 discovery_started = not start_discovery 1972 has_device = self.bluetooth_facade.has_device 1973 1974 if has_device(device_address): 1975 has_device_initially = True 1976 else: 1977 if start_discovery: 1978 discovery_started = self.bluetooth_facade.start_discovery() 1979 1980 if discovery_started: 1981 try: 1982 utils.poll_for_condition( 1983 condition=(lambda: has_device(device_address)), 1984 timeout=self.ADAPTER_DISCOVER_TIMEOUT_SECS, 1985 sleep_interval= 1986 self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS, 1987 desc='Waiting for discovering %s' % device_address) 1988 device_discovered = True 1989 except utils.TimeoutError as e: 1990 logging.error('test_discover_device: %s', e) 1991 except Exception as e: 1992 logging.error('test_discover_device: %s', e) 1993 err = ('bluetoothd probably crashed.' 1994 'Check out /var/log/messages') 1995 logging.error(err) 1996 except: 1997 logging.error('test_discover_device: unexpected error') 1998 1999 if start_discovery and stop_discovery: 2000 discovery_stopped, _ = self.bluetooth_facade.stop_discovery() 2001 is_not_discovering = self._wait_for_condition( 2002 lambda: not self.bluetooth_facade.is_discovering(), 2003 method_name()) 2004 2005 self.results = { 2006 'has_device_initially': has_device_initially, 2007 'should_start_discovery': start_discovery, 2008 'should_stop_discovery': stop_discovery, 2009 'start_discovery': discovery_started, 2010 'stop_discovery': discovery_stopped, 2011 'is_not_discovering': is_not_discovering, 2012 'device_discovered': device_discovered} 2013 2014 # Make sure a discovered device properly started and stopped discovery 2015 device_found = device_discovered and discovery_started and ( 2016 discovery_stopped and is_not_discovering 2017 if stop_discovery else True) 2018 2019 return has_device_initially or device_found 2020 2021 2022 def _test_discover_by_device(self, device): 2023 return device.Discover(self.bluetooth_facade.address) 2024 2025 @test_retry_and_log(False, messages_start=False, messages_stop=False) 2026 def test_discover_by_device(self, device): 2027 """Test that the device could discover the adapter address. 2028 2029 @param device: Meta device to represent peer device. 2030 2031 @returns: True if the adapter is found by the device. 2032 """ 2033 adapter_discovered = False 2034 discover_by_device = self._test_discover_by_device 2035 discovered_initially = discover_by_device(device) 2036 2037 if not discovered_initially: 2038 try: 2039 utils.poll_for_condition( 2040 condition=(lambda: discover_by_device(device)), 2041 timeout=self.ADAPTER_DISCOVER_TIMEOUT_SECS, 2042 sleep_interval= 2043 self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS, 2044 desc='Waiting for adapter to be discovered') 2045 adapter_discovered = True 2046 except utils.TimeoutError as e: 2047 logging.error('test_discover_by_device: %s', e) 2048 except Exception as e: 2049 logging.error('test_discover_by_device: %s', e) 2050 err = ('bluetoothd probably crashed.' 2051 'Check out /var/log/messages') 2052 logging.error(err) 2053 except: 2054 logging.error('test_discover_by_device: unexpected error') 2055 2056 self.results = { 2057 'adapter_discovered_initially': discovered_initially, 2058 'adapter_discovered': adapter_discovered 2059 } 2060 return any(self.results.values()) 2061 2062 @test_retry_and_log(False, messages_start=False, messages_stop=False) 2063 def test_discover_by_device_fails(self, device): 2064 """Test that the device could not discover the adapter address. 2065 2066 @param device: Meta device to represent peer device. 2067 2068 @returns False if the adapter is found by the device. 2069 """ 2070 self.results = { 2071 'adapter_discovered': self._test_discover_by_device(device) 2072 } 2073 return not any(self.results.values()) 2074 2075 @test_retry_and_log(False, messages_start=False, messages_stop=False) 2076 def test_device_set_discoverable(self, device, discoverable): 2077 """Test that we could set the peer device to discoverable. """ 2078 try: 2079 device.SetDiscoverable(discoverable) 2080 except: 2081 return False 2082 2083 return True 2084 2085 @test_retry_and_log 2086 def test_pairing(self, device_address, pin, trusted=True): 2087 """Test that the adapter could pair with the device successfully. 2088 2089 @param device_address: Address of the device. 2090 @param pin: pin code to pair with the device. 2091 @param trusted: indicating whether to set the device trusted. 2092 2093 @returns: True if pairing succeeds. False otherwise. 2094 2095 """ 2096 2097 def _pair_device(): 2098 """Pair to the device. 2099 2100 @returns: True if it could pair with the device. False otherwise. 2101 2102 """ 2103 return self.bluetooth_facade.pair_legacy_device( 2104 device_address, pin, trusted, 2105 self.ADAPTER_PAIRING_TIMEOUT_SECS) 2106 2107 2108 def _verify_connection_info(): 2109 """Verify that connection info to device is retrievable. 2110 2111 @returns: True if the connection info is retrievable. 2112 False otherwise. 2113 """ 2114 return (self.bluetooth_facade.get_connection_info(device_address) 2115 is not None) 2116 2117 def _verify_connected(): 2118 """Verify the device is connected. 2119 2120 @returns: True if the device is connected, False otherwise. 2121 """ 2122 return self.bluetooth_facade.device_is_connected(device_address) 2123 2124 has_device = False 2125 paired = False 2126 connected = False 2127 connection_info_retrievable = False 2128 connected_devices = self.get_num_connected_devices() 2129 2130 if self.bluetooth_facade.has_device(device_address): 2131 has_device = True 2132 try: 2133 utils.poll_for_condition( 2134 condition=_pair_device, 2135 timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS, 2136 sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, 2137 desc='Waiting for pairing %s' % device_address) 2138 paired = True 2139 except utils.TimeoutError as e: 2140 logging.error('test_pairing: %s', e) 2141 except: 2142 logging.error('test_pairing: unexpected error') 2143 2144 connection_info_retrievable = _verify_connection_info() 2145 connected = _verify_connected() 2146 2147 self.results = { 2148 'has_device': has_device, 2149 'paired': paired, 2150 'connected': connected, 2151 'connection_info_retrievable': connection_info_retrievable, 2152 'connection_num': connected_devices + 1 2153 } 2154 2155 return all(self.results.values()) 2156 2157 @test_retry_and_log 2158 def test_remove_pairing(self, device_address): 2159 """Test that the adapter could remove the paired device. 2160 2161 @param device_address: Address of the device. 2162 2163 @returns: True if the device is removed successfully. False otherwise. 2164 2165 """ 2166 device_is_paired_initially = self.bluetooth_facade.device_is_paired( 2167 device_address) 2168 remove_pairing = False 2169 pairing_removed = False 2170 2171 if device_is_paired_initially: 2172 remove_pairing = self.bluetooth_facade.remove_device_object( 2173 device_address) 2174 pairing_removed = not self.bluetooth_facade.device_is_paired( 2175 device_address) 2176 2177 self.results = { 2178 'device_is_paired_initially': device_is_paired_initially, 2179 'remove_pairing': remove_pairing, 2180 'pairing_removed': pairing_removed} 2181 return all(self.results.values()) 2182 2183 2184 def test_set_trusted(self, device_address, trusted=True): 2185 """Test whether the device with the specified address is trusted. 2186 2187 @param device_address: Address of the device. 2188 @param trusted : True or False indicating if trusted is expected. 2189 2190 @returns: True if the device's "Trusted" property is as specified; 2191 False otherwise. 2192 2193 """ 2194 2195 set_trusted = self.bluetooth_facade.set_trusted( 2196 device_address, trusted) 2197 2198 actual_trusted = self.bluetooth_facade.get_device_property( 2199 device_address, 'Trusted') 2200 2201 self.results = { 2202 'set_trusted': set_trusted, 2203 'actual trusted': actual_trusted, 2204 'expected trusted': trusted} 2205 return actual_trusted == trusted 2206 2207 2208 @test_retry_and_log 2209 def test_connection_by_adapter(self, device_address): 2210 """Test that the adapter of dut could connect to the device successfully 2211 2212 It is the caller's responsibility to pair to the device before 2213 doing connection. 2214 2215 @param device_address: Address of the device. 2216 2217 @returns: True if connection is performed. False otherwise. 2218 2219 """ 2220 2221 def _connect_device(): 2222 """Connect to the device. 2223 2224 @returns: True if it could connect to the device. False otherwise. 2225 2226 """ 2227 return self.bluetooth_facade.connect_device(device_address) 2228 2229 2230 has_device = False 2231 connected = False 2232 if self.bluetooth_facade.has_device(device_address): 2233 has_device = True 2234 try: 2235 utils.poll_for_condition( 2236 condition=_connect_device, 2237 timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS, 2238 sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, 2239 desc='Waiting for connecting to %s' % device_address) 2240 connected = True 2241 except utils.TimeoutError as e: 2242 logging.error('test_connection_by_adapter: %s', e) 2243 except: 2244 logging.error('test_connection_by_adapter: unexpected error') 2245 2246 self.results = {'has_device': has_device, 'connected': connected} 2247 return all(self.results.values()) 2248 2249 2250 @test_retry_and_log 2251 def test_disconnection_by_adapter(self, device_address): 2252 """Test that the adapter of dut could disconnect the device successfully 2253 2254 @param device_address: Address of the device. 2255 2256 @returns: True if disconnection is performed. False otherwise. 2257 2258 """ 2259 return self.bluetooth_facade.disconnect_device(device_address) 2260 2261 2262 def _enter_command_mode(self, device): 2263 """Let the device enter command mode. 2264 2265 Before using the device, need to call this method to make sure 2266 it is in the command mode. 2267 2268 @param device: the bluetooth HID device 2269 2270 @returns: True if successful. False otherwise. 2271 2272 """ 2273 result = _is_successful(_run_method(device.EnterCommandMode, 2274 'EnterCommandMode')) 2275 if not result: 2276 logging.error('EnterCommandMode failed') 2277 return result 2278 2279 2280 @test_retry_and_log 2281 def test_connection_by_device(self, device): 2282 """Test that the device could connect to the adapter successfully. 2283 2284 This emulates the behavior that a device may initiate a 2285 connection request after waking up from power saving mode. 2286 2287 @param device: the bluetooth HID device 2288 2289 @returns: True if connection is performed correctly by device and 2290 the adapter also enters connection state. 2291 False otherwise. 2292 2293 """ 2294 if not self._enter_command_mode(device): 2295 return False 2296 2297 method_name = 'test_connection_by_device' 2298 connection_by_device = False 2299 adapter_address = self.bluetooth_facade.address 2300 try: 2301 connection_by_device = device.ConnectToRemoteAddress( 2302 adapter_address) 2303 except Exception as e: 2304 logging.error('%s (device): %s', method_name, e) 2305 except: 2306 logging.error('%s (device): unexpected error', method_name) 2307 2308 connection_seen_by_adapter = False 2309 device_address = device.address 2310 device_is_connected = self.bluetooth_facade.device_is_connected 2311 try: 2312 utils.poll_for_condition( 2313 condition=lambda: device_is_connected(device_address), 2314 timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS, 2315 desc=('Waiting for connection from %s' % device_address)) 2316 connection_seen_by_adapter = True 2317 2318 # Although the connect may be complete, it can take a few 2319 # seconds for the input device to be ready for use 2320 time.sleep(self.ADAPTER_HID_INPUT_DELAY) 2321 except utils.TimeoutError as e: 2322 logging.error('%s (adapter): %s', method_name, e) 2323 except: 2324 logging.error('%s (adapter): unexpected error', method_name) 2325 2326 self.results = { 2327 'connection_by_device': connection_by_device, 2328 'connection_seen_by_adapter': connection_seen_by_adapter} 2329 return all(self.results.values()) 2330 2331 @test_retry_and_log(True, messages_start=False, messages_stop=False) 2332 def test_connection_by_device_only(self, device, adapter_address): 2333 """Test that the device could connect to adapter successfully. 2334 2335 This is a modified version of test_connection_by_device that only 2336 communicates with the peer device and not the host (in case the host is 2337 suspended for example). 2338 2339 @param device: the bluetooth peer device 2340 @param adapter_address: address of the adapter 2341 2342 @returns: True if the connection was established by the device or False. 2343 """ 2344 connected = device.ConnectToRemoteAddress(adapter_address) 2345 if connected: 2346 # Although the connect may be complete, it can take a few 2347 # seconds for the input device to be ready for use 2348 time.sleep(self.ADAPTER_HID_INPUT_DELAY) 2349 2350 self.results = { 2351 'connection_by_device': connected 2352 } 2353 2354 return all(self.results.values()) 2355 2356 2357 @test_retry_and_log 2358 def test_disconnection_by_device(self, device): 2359 """Test that the device could disconnect the adapter successfully. 2360 2361 This emulates the behavior that a device may initiate a 2362 disconnection request before going into power saving mode. 2363 2364 Note: should not try to enter command mode in this method. When 2365 a device is connected, there is no way to enter command mode. 2366 One could just issue a special disconnect command without 2367 entering command mode. 2368 2369 @param device: the bluetooth HID device 2370 2371 @returns: True if disconnection is performed correctly by device and 2372 the adapter also observes the disconnection. 2373 False otherwise. 2374 2375 """ 2376 method_name = 'test_disconnection_by_device' 2377 disconnection_by_device = False 2378 try: 2379 device.Disconnect() 2380 disconnection_by_device = True 2381 except Exception as e: 2382 logging.error('%s (device): %s', method_name, e) 2383 except: 2384 logging.error('%s (device): unexpected error', method_name) 2385 2386 disconnection_seen_by_adapter = False 2387 device_address = device.address 2388 device_is_connected = self.bluetooth_facade.device_is_connected 2389 try: 2390 utils.poll_for_condition( 2391 condition=lambda: not device_is_connected(device_address), 2392 timeout=self.ADAPTER_DISCONNECTION_TIMEOUT_SECS, 2393 desc=('Waiting for disconnection from %s' % device_address)) 2394 disconnection_seen_by_adapter = True 2395 except utils.TimeoutError as e: 2396 logging.error('%s (adapter): %s', method_name, e) 2397 except: 2398 logging.error('%s (adapter): unexpected error', method_name) 2399 2400 self.results = { 2401 'disconnection_by_device': disconnection_by_device, 2402 'disconnection_seen_by_adapter': disconnection_seen_by_adapter} 2403 return all(self.results.values()) 2404 2405 2406 @test_retry_and_log(False) 2407 def test_device_is_connected(self, device_address): 2408 """Test that device address given is currently connected. 2409 2410 @param device_address: Address of the device. 2411 2412 @returns: True if the device is connected. 2413 False otherwise. 2414 """ 2415 2416 def _is_connected(): 2417 """Test if device is connected. 2418 2419 @returns: True if device is connected. False otherwise. 2420 2421 """ 2422 return self.bluetooth_facade.device_is_connected(device_address) 2423 2424 2425 method_name = 'test_device_is_connected' 2426 has_device = False 2427 connected = False 2428 if self.bluetooth_facade.has_device(device_address): 2429 has_device = True 2430 try: 2431 utils.poll_for_condition( 2432 condition=_is_connected, 2433 timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS, 2434 sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, 2435 desc='Waiting to check connection to %s' % 2436 device_address) 2437 connected = True 2438 except utils.TimeoutError as e: 2439 logging.error('%s: %s', method_name, e) 2440 except: 2441 logging.error('%s: unexpected error', method_name) 2442 self.results = {'has_device': has_device, 'connected': connected} 2443 return all(self.results.values()) 2444 2445 2446 @test_retry_and_log(False) 2447 def test_device_is_not_connected(self, device_address): 2448 """Test that device address given is NOT currently connected. 2449 2450 @param device_address: Address of the device. 2451 2452 @returns: True if the device is NOT connected. 2453 False otherwise. 2454 2455 """ 2456 2457 def _is_not_connected(): 2458 """Test if device is not connected. 2459 2460 @returns: True if device is not connected. False otherwise. 2461 2462 """ 2463 return not self.bluetooth_facade.device_is_connected( 2464 device_address) 2465 2466 2467 method_name = 'test_device_is_not_connected' 2468 not_connected = False 2469 if self.bluetooth_facade.has_device(device_address): 2470 try: 2471 utils.poll_for_condition( 2472 condition=_is_not_connected, 2473 timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS, 2474 sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, 2475 desc='Waiting to check connection to %s' % 2476 device_address) 2477 not_connected = True 2478 except utils.TimeoutError as e: 2479 logging.error('%s: %s', method_name, e) 2480 except: 2481 logging.error('%s: unexpected error', method_name) 2482 raise 2483 else: 2484 not_connected = True 2485 self.results = {'not_connected': not_connected} 2486 return all(self.results.values()) 2487 2488 2489 @test_retry_and_log 2490 def test_device_is_paired(self, device_address): 2491 """Test that the device address given is currently paired. 2492 2493 @param device_address: Address of the device. 2494 2495 @returns: True if the device is paired. 2496 False otherwise. 2497 2498 """ 2499 def _is_paired(): 2500 """Test if device is paired. 2501 2502 @returns: True if device is paired. False otherwise. 2503 2504 """ 2505 return self.bluetooth_facade.device_is_paired(device_address) 2506 2507 2508 method_name = 'test_device_is_paired' 2509 has_device = False 2510 paired = False 2511 if self.bluetooth_facade.has_device(device_address): 2512 has_device = True 2513 try: 2514 utils.poll_for_condition( 2515 condition=_is_paired, 2516 timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS, 2517 sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, 2518 desc='Waiting for connection to %s' % device_address) 2519 paired = True 2520 except utils.TimeoutError as e: 2521 logging.error('%s: %s', method_name, e) 2522 except: 2523 logging.error('%s: unexpected error', method_name) 2524 self.results = {'has_device': has_device, 'paired': paired} 2525 return all(self.results.values()) 2526 2527 2528 def _get_device_name(self, device_address): 2529 """Get the device name. 2530 2531 @returns: True if the device name is derived. None otherwise. 2532 2533 """ 2534 2535 self.discovered_device_name = self.bluetooth_facade.get_device_property( 2536 device_address, 'Name') 2537 2538 return bool(self.discovered_device_name) 2539 2540 2541 @test_retry_and_log 2542 def test_device_name(self, device_address, expected_device_name): 2543 """Test that the device name discovered by the adapter is correct. 2544 2545 @param device_address: Address of the device. 2546 @param expected_device_name: the bluetooth device name 2547 2548 @returns: True if the discovered_device_name is expected_device_name. 2549 False otherwise. 2550 2551 """ 2552 try: 2553 utils.poll_for_condition( 2554 condition=lambda: self._get_device_name(device_address), 2555 timeout=self.ADAPTER_DISCOVER_NAME_TIMEOUT_SECS, 2556 sleep_interval=self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS, 2557 desc='Waiting for device name of %s' % device_address) 2558 except utils.TimeoutError as e: 2559 logging.error('test_device_name: %s', e) 2560 except: 2561 logging.error('test_device_name: unexpected error') 2562 2563 self.results = { 2564 'expected_device_name': expected_device_name, 2565 'discovered_device_name': self.discovered_device_name} 2566 return self.discovered_device_name == expected_device_name 2567 2568 2569 @test_retry_and_log 2570 def test_device_class_of_service(self, device_address, 2571 expected_class_of_service): 2572 """Test that the discovered device class of service is as expected. 2573 2574 @param device_address: Address of the device. 2575 @param expected_class_of_service: the expected class of service 2576 2577 @returns: True if the discovered class of service matches the 2578 expected class of service. False otherwise. 2579 2580 """ 2581 2582 device_class = self.bluetooth_facade.get_device_property(device_address, 2583 'Class') 2584 discovered_class_of_service = (device_class & self.CLASS_OF_SERVICE_MASK 2585 if device_class else None) 2586 2587 self.results = { 2588 'device_class': device_class, 2589 'expected_class_of_service': expected_class_of_service, 2590 'discovered_class_of_service': discovered_class_of_service} 2591 return discovered_class_of_service == expected_class_of_service 2592 2593 2594 @test_retry_and_log 2595 def test_device_class_of_device(self, device_address, 2596 expected_class_of_device): 2597 """Test that the discovered device class of device is as expected. 2598 2599 @param device_address: Address of the device. 2600 @param expected_class_of_device: the expected class of device 2601 2602 @returns: True if the discovered class of device matches the 2603 expected class of device. False otherwise. 2604 2605 """ 2606 2607 device_class = self.bluetooth_facade.get_device_property(device_address, 2608 'Class') 2609 discovered_class_of_device = (device_class & self.CLASS_OF_DEVICE_MASK 2610 if device_class else None) 2611 2612 self.results = { 2613 'device_class': device_class, 2614 'expected_class_of_device': expected_class_of_device, 2615 'discovered_class_of_device': discovered_class_of_device} 2616 return discovered_class_of_device == expected_class_of_device 2617 2618 2619 def _get_btmon_log(self, method, logging_timespan=1): 2620 """Capture the btmon log when executing the specified method. 2621 2622 @param method: the method to capture log. 2623 The method would be executed only when it is not None. 2624 This allows us to simply capture btmon log without 2625 executing any command. 2626 @param logging_timespan: capture btmon log for logging_timespan seconds. 2627 2628 """ 2629 self.bluetooth_le_facade.btmon_start() 2630 self.advertising_msg = method() if method else '' 2631 time.sleep(logging_timespan) 2632 self.bluetooth_le_facade.btmon_stop() 2633 2634 2635 def convert_to_adv_jiffies(self, adv_interval_ms): 2636 """Convert adv interval in ms to jiffies, i.e., multiples of 0.625 ms. 2637 2638 @param adv_interval_ms: an advertising interval 2639 2640 @returns: the equivalent jiffies 2641 2642 """ 2643 return adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT 2644 2645 2646 def compute_duration(self, max_adv_interval_ms): 2647 """Compute duration from max_adv_interval_ms. 2648 2649 Advertising duration is calculated approximately as 2650 duration = max_adv_interval_ms / 1000.0 * 1.1 2651 2652 @param max_adv_interval_ms: max advertising interval in milliseconds. 2653 2654 @returns: duration in seconds. 2655 2656 """ 2657 return max_adv_interval_ms / 1000.0 * 1.1 2658 2659 2660 def compute_logging_timespan(self, max_adv_interval_ms): 2661 """Compute the logging timespan from max_adv_interval_ms. 2662 2663 The logging timespan is the time needed to record btmon log. 2664 2665 @param max_adv_interval_ms: max advertising interval in milliseconds. 2666 2667 @returns: logging_timespan in seconds. 2668 2669 """ 2670 duration = self.compute_duration(max_adv_interval_ms) 2671 logging_timespan = max(self.count_advertisements * duration, 1) 2672 return logging_timespan 2673 2674 2675 @test_retry_and_log(False) 2676 def test_check_duration_and_intervals(self, min_adv_interval_ms, 2677 max_adv_interval_ms, 2678 number_advertisements): 2679 """Verify that every advertisements are scheduled according to the 2680 duration and intervals. 2681 2682 An advertisement would be scheduled at the time span of 2683 duration * number_advertisements 2684 2685 @param min_adv_interval_ms: min advertising interval in milliseconds. 2686 @param max_adv_interval_ms: max advertising interval in milliseconds. 2687 @param number_advertisements: the number of existing advertisements 2688 2689 @returns: True if all advertisements are scheduled based on the 2690 duration and intervals. 2691 2692 """ 2693 2694 2695 def within_tolerance(expected, actual, max_error=0.1): 2696 """Determine if the percent error is within specified tolerance. 2697 2698 @param expected: The expected value. 2699 @param actual: The actual (measured) value. 2700 @param max_error: The maximum percent error acceptable. 2701 2702 @returns: True if the percent error is less than or equal to 2703 max_error. 2704 """ 2705 return abs(expected - actual) / abs(expected) <= max_error 2706 2707 2708 start_str = 'Set Advertising Intervals:' 2709 search_strings = ['HCI Command: LE Set Advertising Data', 'Company'] 2710 search_str = '|'.join(search_strings) 2711 2712 contents = self.bluetooth_le_facade.btmon_get(search_str=search_str, 2713 start_str=start_str) 2714 2715 # Company string looks like 2716 # Company: not assigned (65283) 2717 company_pattern = re.compile('Company:.*\((\d*)\)') 2718 2719 # The string with timestamp looks like 2720 # < HCI Command: LE Set Advertising Data (0x08|0x0008) [hci0] 3.799236 2721 set_adv_time_str = 'LE Set Advertising Data.*\[hci\d\].*(\d+\.\d+)' 2722 set_adv_time_pattern = re.compile(set_adv_time_str) 2723 2724 adv_timestamps = {} 2725 timestamp = None 2726 manufacturer_id = None 2727 for line in contents: 2728 result = set_adv_time_pattern.search(line) 2729 if result: 2730 timestamp = float(result.group(1)) 2731 2732 result = company_pattern.search(line) 2733 if result: 2734 manufacturer_id = '0x%04x' % int(result.group(1)) 2735 2736 if timestamp and manufacturer_id: 2737 if manufacturer_id not in adv_timestamps: 2738 adv_timestamps[manufacturer_id] = [] 2739 adv_timestamps[manufacturer_id].append(timestamp) 2740 timestamp = None 2741 manufacturer_id = None 2742 2743 duration = self.compute_duration(max_adv_interval_ms) 2744 expected_timespan = duration * number_advertisements 2745 2746 check_duration = True 2747 for manufacturer_id, values in six.iteritems(adv_timestamps): 2748 logging.debug('manufacturer_id %s: %s', manufacturer_id, values) 2749 timespans = [values[i] - values[i - 1] 2750 for i in range(1, len(values))] 2751 errors = [timespans[i] for i in range(len(timespans)) 2752 if not within_tolerance(expected_timespan, timespans[i])] 2753 logging.debug('timespans: %s', timespans) 2754 logging.debug('errors: %s', errors) 2755 if bool(errors): 2756 check_duration = False 2757 2758 # Verify that the advertising intervals are also correct. 2759 min_adv_interval_ms_found, max_adv_interval_ms_found = ( 2760 self._verify_advertising_intervals(min_adv_interval_ms, 2761 max_adv_interval_ms)) 2762 2763 self.results = { 2764 'check_duration': check_duration, 2765 'max_adv_interval_ms_found': max_adv_interval_ms_found, 2766 'max_adv_interval_ms_found': max_adv_interval_ms_found, 2767 } 2768 return all(self.results.values()) 2769 2770 2771 def _get_min_max_intervals_strings(self, min_adv_interval_ms, 2772 max_adv_interval_ms): 2773 """Get the min and max advertising intervals strings shown in btmon. 2774 2775 Advertising intervals shown in the btmon log look like 2776 Min advertising interval: 1280.000 msec (0x0800) 2777 Max advertising interval: 1280.000 msec (0x0800) 2778 2779 @param min_adv_interval_ms: min advertising interval in milliseconds. 2780 @param max_adv_interval_ms: max advertising interval in milliseconds. 2781 2782 @returns: the min and max intervals strings. 2783 2784 """ 2785 min_str = ('Min advertising interval: %.3f msec (0x%04x)' % 2786 (min_adv_interval_ms, 2787 min_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT)) 2788 logging.debug('min_adv_interval_ms: %s', min_str) 2789 2790 max_str = ('Max advertising interval: %.3f msec (0x%04x)' % 2791 (max_adv_interval_ms, 2792 max_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT)) 2793 logging.debug('max_adv_interval_ms: %s', max_str) 2794 2795 return (min_str, max_str) 2796 2797 2798 def _verify_advertising_intervals(self, min_adv_interval_ms, 2799 max_adv_interval_ms): 2800 """Verify min and max advertising intervals. 2801 2802 Advertising intervals look like 2803 Min advertising interval: 1280.000 msec (0x0800) 2804 Max advertising interval: 1280.000 msec (0x0800) 2805 2806 @param min_adv_interval_ms: min advertising interval in milliseconds. 2807 @param max_adv_interval_ms: max advertising interval in milliseconds. 2808 2809 @returns: a tuple of (True, True) if both min and max advertising 2810 intervals could be found. Otherwise, the corresponding element 2811 in the tuple if False. 2812 2813 """ 2814 min_str, max_str = self._get_min_max_intervals_strings( 2815 min_adv_interval_ms, max_adv_interval_ms) 2816 2817 min_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(min_str) 2818 max_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(max_str) 2819 2820 return min_adv_interval_ms_found, max_adv_interval_ms_found 2821 2822 2823 def _verify_scan_response_data(self, adv_data): 2824 """Verify advertisement's scan response data is correct 2825 2826 Unlike the other fixed advertising fields, Scan Response Data is set 2827 in a tag-value data format. This function helps verify the data format 2828 for specific tag values to ensure scan response was propagated correctly 2829 2830 @param adv_data: Dictionary defining advertising fields to be registered 2831 with bluetoothd daemon's RegisterAdvertisement interface 2832 2833 @returns: True if all Registered Scan Response tags were located in 2834 btmon trace, False otherwise 2835 """ 2836 2837 scan_rsp = adv_data.get('ScanResponseData') 2838 if not scan_rsp: 2839 return True 2840 2841 for tag, data in scan_rsp.items(): 2842 # Validate 16 bit Service Data tag 2843 if int(tag, 16) == 0x16: 2844 # First two bytes of data are endian-corrected UUID, followed 2845 # by service data 2846 uuid = '%x%x' % (data[1], data[0]) 2847 data_str = ''.join( 2848 ['%02x' % data[i] for i in range(2, len(data))]) 2849 2850 # Service data has the following format in btmon trace: 2851 # Service Data (UUID 0xfef3): 01020304 2852 search_str = 'Service Data (UUID 0x{}): {}'.format( 2853 uuid, data_str) 2854 2855 # Fail if data can't be located in btmon trace 2856 if not self.bluetooth_le_facade.btmon_find(search_str): 2857 return False 2858 2859 return True 2860 2861 def test_advertising_flags(self, flag_strs=[]): 2862 """Verify that advertising flags are set in registered advertisement 2863 2864 Each flag has a specific descriptor that appears in btmon trace. This 2865 simple checker validates that the desired flag descriptors appear in 2866 btmon trace when the advertisement was registered. 2867 2868 @param flag_strs: Flag string descriptors expected in btmon trace 2869 2870 #returns: True if all flag descriptors were located, False otherwise 2871 """ 2872 2873 for flag_str in flag_strs: 2874 if not self.bluetooth_le_facade.btmon_find(flag_str): 2875 logging.info( 2876 'Flag descriptor not located: {}'.format(flag_str)) 2877 return False 2878 2879 return True 2880 2881 def ext_adv_enabled(self): 2882 """ Check if platform supports extended advertising 2883 2884 @returns True if extended advertising is supported, else False 2885 """ 2886 platform = self.get_base_platform_name() 2887 return platform in EXT_ADV_MODELS 2888 2889 2890 @test_retry_and_log(False) 2891 def test_register_advertisement(self, advertisement_data, instance_id, 2892 min_adv_interval_ms, max_adv_interval_ms): 2893 """Verify that an advertisement is registered correctly. 2894 2895 This test verifies the following data: 2896 - advertisement added 2897 - manufacturer data 2898 - service UUIDs 2899 - service data 2900 - advertising intervals 2901 - advertising enabled 2902 2903 @param advertisement_data: the data of an advertisement to register. 2904 @param instance_id: the instance id which starts at 1. 2905 @param min_adv_interval_ms: min_adv_interval in milliseconds. 2906 @param max_adv_interval_ms: max_adv_interval in milliseconds. 2907 2908 @returns: True if the advertisement is registered correctly. 2909 False otherwise. 2910 2911 """ 2912 # When registering a new advertisement, it is possible that another 2913 # instance is advertising. It may need to wait for all other 2914 # advertisements to complete advertising once. 2915 self.count_advertisements += 1 2916 logging_timespan = self.compute_logging_timespan(max_adv_interval_ms) 2917 self._get_btmon_log( 2918 lambda: self.bluetooth_le_facade.register_advertisement( 2919 advertisement_data), 2920 logging_timespan=logging_timespan) 2921 2922 # Verify that a new advertisement is added. 2923 advertisement_added = ( 2924 self.bluetooth_le_facade.btmon_find('Advertising Added') and 2925 self.bluetooth_le_facade.btmon_find('Instance: %d' % 2926 instance_id)) 2927 2928 # Verify that the manufacturer data could be found. 2929 manufacturer_data = advertisement_data.get('ManufacturerData', '') 2930 manufacturer_data_found = True 2931 for manufacturer_id in manufacturer_data: 2932 # The 'not assigned' text below means the manufacturer id 2933 # is not actually assigned to any real manufacturer. 2934 # For real 16-bit manufacturer UUIDs, refer to 2935 # https://www.bluetooth.com/specifications/assigned-numbers/16-bit-UUIDs-for-Members 2936 manufacturer_data_found = self.bluetooth_le_facade.btmon_find( 2937 'Company: not assigned (%d)' % int(manufacturer_id, 16)) 2938 2939 # Verify that all service UUIDs could be found. 2940 service_uuids_found = True 2941 for uuid in advertisement_data.get('ServiceUUIDs', []): 2942 # Service UUIDs looks like ['0x180D', '0x180F'] 2943 # Heart Rate (0x180D) 2944 # Battery Service (0x180F) 2945 # For actual 16-bit service UUIDs, refer to 2946 # https://www.bluetooth.com/specifications/gatt/services 2947 if not self.bluetooth_le_facade.btmon_find('0x%s' % uuid): 2948 service_uuids_found = False 2949 break 2950 2951 # Verify service data. 2952 service_data_found = True 2953 for uuid, data in advertisement_data.get('ServiceData', {}).items(): 2954 # A service data looks like 2955 # Service Data (UUID 0x9999): 0001020304 2956 # while uuid is '9999' and data is [0x00, 0x01, 0x02, 0x03, 0x04] 2957 data_str = ''.join(['%02x' % n for n in data]) 2958 if not self.bluetooth_le_facade.btmon_find( 2959 'Service Data (UUID 0x%s): %s' % (uuid, data_str)): 2960 service_data_found = False 2961 break 2962 2963 # Broadcast advertisements are overwritten in some kernel versions to 2964 # be more aggressive. Verify that the advertising intervals are correct 2965 # if this mode is not used 2966 if advertisement_data.get('Type') != 'broadcast': 2967 min_adv_interval_ms_found, max_adv_interval_ms_found = ( 2968 self._verify_advertising_intervals(min_adv_interval_ms, 2969 max_adv_interval_ms)) 2970 2971 else: 2972 min_adv_interval_ms_found = True 2973 max_adv_interval_ms_found = True 2974 2975 scan_rsp_correct = self._verify_scan_response_data(advertisement_data) 2976 2977 # Verify advertising is enabled. 2978 advertising_enabled = self.bluetooth_le_facade.btmon_find( 2979 'Advertising: Enabled (0x01)') 2980 2981 self.results = { 2982 'advertisement_added': advertisement_added, 2983 'manufacturer_data_found': manufacturer_data_found, 2984 'service_uuids_found': service_uuids_found, 2985 'service_data_found': service_data_found, 2986 'min_adv_interval_ms_found': min_adv_interval_ms_found, 2987 'max_adv_interval_ms_found': max_adv_interval_ms_found, 2988 'scan_rsp_correct': scan_rsp_correct, 2989 'advertising_enabled': advertising_enabled, 2990 } 2991 return all(self.results.values()) 2992 2993 2994 @test_retry_and_log(False) 2995 def test_fail_to_register_advertisement(self, advertisement_data, 2996 min_adv_interval_ms, 2997 max_adv_interval_ms): 2998 """Verify that failure is incurred when max advertisements are reached. 2999 3000 This test verifies that a registration failure is incurred when 3001 max advertisements are reached. The error message looks like: 3002 3003 org.bluez.Error.Failed: Maximum advertisements reached 3004 3005 @param advertisement_data: the advertisement to register. 3006 @param min_adv_interval_ms: min_adv_interval in milliseconds. 3007 @param max_adv_interval_ms: max_adv_interval in milliseconds. 3008 3009 @returns: True if the error message is received correctly. 3010 False otherwise. 3011 3012 """ 3013 logging_timespan = self.compute_logging_timespan(max_adv_interval_ms) 3014 self._get_btmon_log( 3015 lambda: self.bluetooth_le_facade.register_advertisement( 3016 advertisement_data), 3017 logging_timespan=logging_timespan) 3018 3019 # Verify that it failed to register advertisement due to the fact 3020 # that max advertisements are reached. 3021 failed_to_register_error = (self.ERROR_FAILED_TO_REGISTER_ADVERTISEMENT 3022 in self.advertising_msg) 3023 3024 # Verify that no new advertisement is added. 3025 advertisement_not_added = not self.bluetooth_le_facade.btmon_find( 3026 'Advertising Added:') 3027 3028 self.results = { 3029 'failed_to_register_error': failed_to_register_error, 3030 'advertisement_not_added': advertisement_not_added, 3031 } 3032 3033 # If the registration fails and extended advertising is available, 3034 # there will be no events in btmon. Therefore, we only run this part of 3035 # the test if extended advertising is not available, indicating that 3036 # software advertisement rotation is being used. 3037 if not self.ext_adv_enabled(): 3038 # Verify that the advertising intervals are correct. 3039 min_adv_interval_ms_found, max_adv_interval_ms_found = ( 3040 self._verify_advertising_intervals(min_adv_interval_ms, 3041 max_adv_interval_ms)) 3042 3043 # Verify advertising remains enabled. 3044 advertising_enabled = self.bluetooth_le_facade.btmon_find( 3045 'Advertising: Enabled (0x01)') 3046 3047 self.results.update({ 3048 'min_adv_interval_ms_found': min_adv_interval_ms_found, 3049 'max_adv_interval_ms_found': max_adv_interval_ms_found, 3050 'advertising_enabled': advertising_enabled, 3051 }) 3052 3053 return all(self.results.values()) 3054 3055 3056 @test_retry_and_log(False) 3057 def test_unregister_advertisement(self, advertisement_data, instance_id, 3058 advertising_disabled): 3059 """Verify that an advertisement is unregistered correctly. 3060 3061 This test verifies the following data: 3062 - advertisement removed 3063 - advertising status: enabled if there are advertisements left; 3064 disabled otherwise. 3065 3066 @param advertisement_data: the data of an advertisement to unregister. 3067 @param instance_id: the instance id of the advertisement to remove. 3068 @param advertising_disabled: is advertising disabled? This happens 3069 only when all advertisements are removed. 3070 3071 @returns: True if the advertisement is unregistered correctly. 3072 False otherwise. 3073 3074 """ 3075 self.count_advertisements -= 1 3076 self._get_btmon_log( 3077 lambda: self.bluetooth_le_facade.unregister_advertisement( 3078 advertisement_data)) 3079 3080 # Verify that the advertisement is removed. 3081 advertisement_removed = ( 3082 self.bluetooth_le_facade.btmon_find('Advertising Removed') and 3083 self.bluetooth_le_facade.btmon_find('Instance: %d' % 3084 instance_id)) 3085 3086 # If advertising_disabled is True, there should be no log like 3087 # 'Advertising: Enabled (0x01)' 3088 # If advertising_disabled is False, there should be log like 3089 # 'Advertising: Enabled (0x01)' 3090 3091 # Only need to check advertising status when the last advertisement 3092 # is removed. For any other advertisements prior to the last one, 3093 # we may or may not observe 'Advertising: Enabled (0x01)' message. 3094 # Hence, the test would become flaky if we insist to see that message. 3095 # A possible workaround is to sleep for a while and then check the 3096 # message. The drawback is that we may need to wait up to 10 seconds 3097 # if the advertising duration and intervals are long. 3098 # In a test case, we always run test_check_duration_and_intervals() 3099 # to check if advertising duration and intervals are correct after 3100 # un-registering one or all advertisements, it is safe to do so. 3101 advertising_enabled_found = self.bluetooth_le_facade.btmon_find( 3102 'Advertising: Enabled (0x01)') 3103 advertising_disabled_found = self.bluetooth_le_facade.btmon_find( 3104 'Advertising: Disabled (0x00)') 3105 advertising_status_correct = not advertising_disabled or ( 3106 advertising_disabled_found and not advertising_enabled_found) 3107 3108 self.results = { 3109 'advertisement_removed': advertisement_removed, 3110 'advertising_status_correct': advertising_status_correct, 3111 } 3112 return all(self.results.values()) 3113 3114 3115 @test_retry_and_log(False) 3116 def test_set_advertising_intervals(self, min_adv_interval_ms, 3117 max_adv_interval_ms): 3118 """Verify that new advertising intervals are set correctly. 3119 3120 Note that setting advertising intervals does not enable/disable 3121 advertising. Hence, there is no need to check the advertising 3122 status. 3123 3124 @param min_adv_interval_ms: the min advertising interval in ms. 3125 @param max_adv_interval_ms: the max advertising interval in ms. 3126 3127 @returns: True if the new advertising intervals are correct. 3128 False otherwise. 3129 3130 """ 3131 self._get_btmon_log( 3132 lambda: self.bluetooth_le_facade.set_advertising_intervals( 3133 min_adv_interval_ms, max_adv_interval_ms)) 3134 3135 # Verify the new advertising intervals. 3136 # With intervals of 200 ms and 200 ms, the log looks like 3137 # bluetoothd: Set Advertising Intervals: 0x0140, 0x0140 3138 txt = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x' 3139 adv_intervals_found = self.bluetooth_le_facade.btmon_find( 3140 txt % (self.convert_to_adv_jiffies(min_adv_interval_ms), 3141 self.convert_to_adv_jiffies(max_adv_interval_ms))) 3142 3143 self.results = {'adv_intervals_found': adv_intervals_found} 3144 return all(self.results.values()) 3145 3146 3147 @test_retry_and_log(False) 3148 def test_fail_to_set_advertising_intervals( 3149 self, invalid_min_adv_interval_ms, invalid_max_adv_interval_ms, 3150 orig_min_adv_interval_ms, orig_max_adv_interval_ms): 3151 """Verify that setting invalid advertising intervals results in error. 3152 3153 If invalid min/max advertising intervals are given, it would incur 3154 the error: 'org.bluez.Error.InvalidArguments: Invalid arguments'. 3155 Note that valid advertising intervals fall between 20 ms and 10,240 ms. 3156 3157 @param invalid_min_adv_interval_ms: the invalid min advertising interval 3158 in ms. 3159 @param invalid_max_adv_interval_ms: the invalid max advertising interval 3160 in ms. 3161 @param orig_min_adv_interval_ms: the original min advertising interval 3162 in ms. 3163 @param orig_max_adv_interval_ms: the original max advertising interval 3164 in ms. 3165 3166 @returns: True if it fails to set invalid advertising intervals. 3167 False otherwise. 3168 3169 """ 3170 self._get_btmon_log( 3171 lambda: self.bluetooth_le_facade.set_advertising_intervals( 3172 invalid_min_adv_interval_ms, 3173 invalid_max_adv_interval_ms)) 3174 3175 # Verify that the invalid error is observed in the dbus error callback 3176 # message. 3177 invalid_intervals_error = (self.ERROR_INVALID_ADVERTISING_INTERVALS in 3178 self.advertising_msg) 3179 3180 # Verify that the min/max advertising intervals remain the same 3181 # after setting the invalid advertising intervals. 3182 # 3183 # In btmon log, we would see the following message first. 3184 # bluetoothd: Set Advertising Intervals: 0x0010, 0x0010 3185 # And then, we should check if "Min advertising interval" and 3186 # "Max advertising interval" remain the same. 3187 start_str = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x' % ( 3188 self.convert_to_adv_jiffies(invalid_min_adv_interval_ms), 3189 self.convert_to_adv_jiffies(invalid_max_adv_interval_ms)) 3190 3191 search_strings = ['Min advertising interval:', 3192 'Max advertising interval:'] 3193 search_str = '|'.join(search_strings) 3194 3195 contents = self.bluetooth_le_facade.btmon_get(search_str=search_str, 3196 start_str=start_str) 3197 3198 # The min/max advertising intervals of all advertisements should remain 3199 # the same as the previous valid ones. 3200 min_max_str = '[Min|Max] advertising interval: (\d*\.\d*) msec' 3201 min_max_pattern = re.compile(min_max_str) 3202 correct_orig_min_adv_interval = True 3203 correct_orig_max_adv_interval = True 3204 for line in contents: 3205 result = min_max_pattern.search(line) 3206 if result: 3207 interval = float(result.group(1)) 3208 if 'Min' in line and interval != orig_min_adv_interval_ms: 3209 correct_orig_min_adv_interval = False 3210 elif 'Max' in line and interval != orig_max_adv_interval_ms: 3211 correct_orig_max_adv_interval = False 3212 3213 self.results = { 3214 'invalid_intervals_error': invalid_intervals_error, 3215 'correct_orig_min_adv_interval': correct_orig_min_adv_interval, 3216 'correct_orig_max_adv_interval': correct_orig_max_adv_interval} 3217 3218 return all(self.results.values()) 3219 3220 3221 @test_retry_and_log(False) 3222 def test_check_advertising_intervals(self, min_adv_interval_ms, 3223 max_adv_interval_ms): 3224 """Verify that the advertising intervals are as expected. 3225 3226 @param min_adv_interval_ms: the min advertising interval in ms. 3227 @param max_adv_interval_ms: the max advertising interval in ms. 3228 3229 @returns: True if the advertising intervals are correct. 3230 False otherwise. 3231 3232 """ 3233 self._get_btmon_log(None) 3234 3235 # Verify that the advertising intervals are correct. 3236 min_adv_interval_ms_found, max_adv_interval_ms_found = ( 3237 self._verify_advertising_intervals(min_adv_interval_ms, 3238 max_adv_interval_ms)) 3239 3240 self.results = { 3241 'min_adv_interval_ms_found': min_adv_interval_ms_found, 3242 'max_adv_interval_ms_found': max_adv_interval_ms_found, 3243 } 3244 return all(self.results.values()) 3245 3246 3247 @test_retry_and_log(False) 3248 def test_reset_advertising(self, instance_ids=[]): 3249 """Verify that advertising is reset correctly. 3250 3251 Note that reset advertising would set advertising intervals to 3252 the default values. However, we would not be able to observe 3253 the values change until new advertisements are registered. 3254 Therefore, it is required that a test_register_advertisement() 3255 test is conducted after this test. 3256 3257 If instance_ids is [], all advertisements would still be removed 3258 if there are any. However, no need to check 'Advertising Removed' 3259 in btmon log since we may or may not be able to observe the message. 3260 This feature is needed if this test is invoked as the first one in 3261 a test case to reset advertising. In this situation, this test does 3262 not know how many advertisements exist. 3263 3264 @param instance_ids: the list of instance IDs that should be removed. 3265 3266 @returns: True if advertising is reset correctly. 3267 False otherwise. 3268 3269 """ 3270 self.count_advertisements = 0 3271 self._get_btmon_log( 3272 lambda: self.bluetooth_le_facade.reset_advertising()) 3273 3274 # Verify that every advertisement is removed. When an advertisement 3275 # with instance id 1 is removed, the log looks like 3276 # Advertising Removed 3277 # instance: 1 3278 if len(instance_ids) > 0: 3279 advertisement_removed = self.bluetooth_le_facade.btmon_find( 3280 'Advertising Removed') 3281 if advertisement_removed: 3282 for instance_id in instance_ids: 3283 txt = 'Instance: %d' % instance_id 3284 if not self.bluetooth_le_facade.btmon_find(txt): 3285 advertisement_removed = False 3286 break 3287 else: 3288 advertisement_removed = True 3289 3290 if not advertisement_removed: 3291 logging.error('Failed to remove advertisement') 3292 3293 # Verify that "Reset Advertising Intervals" command has been issued. 3294 reset_advertising_intervals = self.bluetooth_le_facade.btmon_find( 3295 'bluetoothd: Reset Advertising Intervals') 3296 3297 # Verify the advertising is disabled. 3298 advertising_disabled_observied = self.bluetooth_le_facade.btmon_find( 3299 'Advertising: Disabled') 3300 # If there are no existing advertisements, we may not observe 3301 # 'Advertising: Disabled'. 3302 advertising_disabled = (instance_ids == [] or 3303 advertising_disabled_observied) 3304 3305 self.results = { 3306 'advertisement_removed': advertisement_removed, 3307 'reset_advertising_intervals': reset_advertising_intervals, 3308 'advertising_disabled': advertising_disabled, 3309 } 3310 return all(self.results.values()) 3311 3312 3313 @test_retry_and_log(False) 3314 def test_receive_advertisement(self, address=None, UUID=None, timeout=10): 3315 """Verifies that we receive an advertisement with specific contents 3316 3317 Since test_discover_device only uses the existence of a device dbus path 3318 to indicate when a device is discovered, it is not adequate if we want 3319 to verify that we have received an advertisement from a device. This 3320 test monitors btmon around a discovery instance and searches for the 3321 relevant advertising report. 3322 3323 @param address: String address of peer 3324 @param UUID: String of hex data 3325 @param timeout: seconds to listen for traffic 3326 3327 @returns True if report was located, otherwise False 3328 """ 3329 3330 def _discover_devices(): 3331 self.test_start_discovery() 3332 time.sleep(timeout) 3333 self.test_stop_discovery() 3334 3335 # Run discovery, record btmon log 3336 self._get_btmon_log(_discover_devices) 3337 3338 # Grab all logs received 3339 btmon_log = '\n'.join(self.bluetooth_le_facade.btmon_get('', '')) 3340 3341 desired_strs = [] 3342 3343 if address is not None: 3344 desired_strs.append('Address: {}'.format(address)) 3345 3346 if UUID is not None: 3347 desired_strs.append('({})'.format(UUID)) 3348 3349 # Split btmon events by HCI and MGMT delimiters 3350 event_delimiter = '|'.join(['@ MGMT', '> HCI', '< HCI']) 3351 btmon_events = re.split(event_delimiter, btmon_log) 3352 3353 features_located = False 3354 3355 for event_str in btmon_events: 3356 if 'Advertising Report' not in event_str: 3357 continue 3358 3359 for desired_str in desired_strs: 3360 if desired_str not in event_str: 3361 break 3362 3363 else: 3364 features_located = True 3365 3366 self.results = { 3367 'features_located': features_located, 3368 } 3369 return all(self.results.values()) 3370 3371 3372 def add_device(self, address, address_type, action): 3373 """Add a device to the Kernel action list.""" 3374 return self.bluetooth_facade.add_device(address, address_type, action) 3375 3376 3377 def remove_device(self, address, address_type): 3378 """Remove a device from the Kernel action list.""" 3379 return self.bluetooth_facade.remove_device(address,address_type) 3380 3381 3382 def read_supported_commands(self): 3383 """Read the set of supported commands from the Kernel.""" 3384 return self.bluetooth_facade.read_supported_commands() 3385 3386 3387 def read_info(self): 3388 """Read the adapter information from the Kernel.""" 3389 return self.bluetooth_facade.read_info() 3390 3391 3392 def get_adapter_properties(self): 3393 """Read the adapter properties from the Bluetooth Daemon.""" 3394 return self.bluetooth_facade.get_adapter_properties() 3395 3396 3397 def get_dev_info(self): 3398 """Read raw HCI device information.""" 3399 return self.bluetooth_facade.get_dev_info() 3400 3401 def log_settings(self, msg, settings): 3402 """function convert MGMT_OP_READ_INFO settings to string 3403 3404 @param msg: string to include in output 3405 @param settings: bitstring returned by MGMT_OP_READ_INFO 3406 @return : List of strings indicating different settings 3407 """ 3408 strs = [] 3409 if settings & bluetooth_socket.MGMT_SETTING_POWERED: 3410 strs.append("POWERED") 3411 if settings & bluetooth_socket.MGMT_SETTING_CONNECTABLE: 3412 strs.append("CONNECTABLE") 3413 if settings & bluetooth_socket.MGMT_SETTING_FAST_CONNECTABLE: 3414 strs.append("FAST-CONNECTABLE") 3415 if settings & bluetooth_socket.MGMT_SETTING_DISCOVERABLE: 3416 strs.append("DISCOVERABLE") 3417 if settings & bluetooth_socket.MGMT_SETTING_PAIRABLE: 3418 strs.append("PAIRABLE") 3419 if settings & bluetooth_socket.MGMT_SETTING_LINK_SECURITY: 3420 strs.append("LINK-SECURITY") 3421 if settings & bluetooth_socket.MGMT_SETTING_SSP: 3422 strs.append("SSP") 3423 if settings & bluetooth_socket.MGMT_SETTING_BREDR: 3424 strs.append("BR/EDR") 3425 if settings & bluetooth_socket.MGMT_SETTING_HS: 3426 strs.append("HS") 3427 if settings & bluetooth_socket.MGMT_SETTING_LE: 3428 strs.append("LE") 3429 logging.debug('%s : %s', msg, " ".join(strs)) 3430 return strs 3431 3432 def log_flags(self, msg, flags): 3433 """Function to convert HCI state configuration to a string 3434 3435 @param msg: string to include in output 3436 @param settings: bitstring returned by get_dev_info 3437 @return : List of strings indicating different flags 3438 """ 3439 strs = [] 3440 if flags & bluetooth_socket.HCI_UP: 3441 strs.append("UP") 3442 else: 3443 strs.append("DOWN") 3444 if flags & bluetooth_socket.HCI_INIT: 3445 strs.append("INIT") 3446 if flags & bluetooth_socket.HCI_RUNNING: 3447 strs.append("RUNNING") 3448 if flags & bluetooth_socket.HCI_PSCAN: 3449 strs.append("PSCAN") 3450 if flags & bluetooth_socket.HCI_ISCAN: 3451 strs.append("ISCAN") 3452 if flags & bluetooth_socket.HCI_AUTH: 3453 strs.append("AUTH") 3454 if flags & bluetooth_socket.HCI_ENCRYPT: 3455 strs.append("ENCRYPT") 3456 if flags & bluetooth_socket.HCI_INQUIRY: 3457 strs.append("INQUIRY") 3458 if flags & bluetooth_socket.HCI_RAW: 3459 strs.append("RAW") 3460 logging.debug('%s [HCI]: %s', msg, " ".join(strs)) 3461 return strs 3462 3463 3464 @test_retry_and_log(False) 3465 def test_service_resolved(self, address): 3466 """Test that the services under device address can be resolved 3467 3468 @param address: MAC address of a device 3469 3470 @returns: True if the ServicesResolved property is changed before 3471 timeout, False otherwise. 3472 3473 """ 3474 is_resolved_func = self.bluetooth_facade.device_services_resolved 3475 return self._wait_for_condition(lambda : is_resolved_func(address),\ 3476 method_name()) 3477 3478 3479 @test_retry_and_log(False) 3480 def test_gatt_browse(self, address): 3481 """Test that the GATT client can get the attributes correctly 3482 3483 @param address: MAC address of a device 3484 3485 @returns: True if the attribute map received by GATT client is the same 3486 as expected. False otherwise. 3487 3488 """ 3489 3490 gatt_client_facade = GATT_ClientFacade(self.bluetooth_facade) 3491 actual_app = gatt_client_facade.browse(address) 3492 expected_app = GATT_HIDApplication() 3493 diff = GATT_Application.diff(actual_app, expected_app) 3494 3495 self.result = { 3496 'actural_result': actual_app, 3497 'expected_result': expected_app 3498 } 3499 3500 gatt_attribute_hierarchy = ['Device', 'Service', 'Characteristic', 3501 'Descriptor'] 3502 # Remove any difference in object path 3503 for parent, child in zip(gatt_attribute_hierarchy, 3504 gatt_attribute_hierarchy[1:]): 3505 pattern = re.compile('^%s .* is different in %s' % (child, parent)) 3506 for diff_str in diff[::]: 3507 if pattern.search(diff_str): 3508 diff.remove(diff_str) 3509 3510 if len(diff) != 0: 3511 logging.error('Application Diff: %s', diff) 3512 return False 3513 return True 3514 3515 3516 def _record_input_events(self, device, gesture, address=None): 3517 """Record the input events. 3518 3519 @param device: the bluetooth HID device. 3520 @param gesture: the gesture method to perform. 3521 3522 @returns: the input events received on the DUT. 3523 3524 """ 3525 self.input_facade.initialize_input_recorder(device.name, uniq=address) 3526 self.input_facade.start_input_recorder(device.name) 3527 time.sleep(self.HID_REPORT_SLEEP_SECS) 3528 gesture() 3529 time.sleep(self.HID_REPORT_SLEEP_SECS) 3530 self.input_facade.stop_input_recorder(device.name) 3531 time.sleep(self.HID_REPORT_SLEEP_SECS) 3532 event_values = self.input_facade.get_input_events(device.name) 3533 events = [Event(*ev) for ev in event_values] 3534 return events 3535 3536 3537 # ------------------------------------------------------------------- 3538 # Bluetooth mouse related tests 3539 # ------------------------------------------------------------------- 3540 3541 3542 def _test_mouse_click(self, device, button): 3543 """Test that the mouse click events could be received correctly. 3544 3545 @param device: the meta device containing a bluetooth HID device 3546 @param button: which button to test, 'LEFT' or 'RIGHT' 3547 3548 @returns: True if the report received by the host matches the 3549 expected one. False otherwise. 3550 3551 """ 3552 if button == 'LEFT': 3553 gesture = device.LeftClick 3554 elif button == 'RIGHT': 3555 gesture = device.RightClick 3556 else: 3557 raise error.TestError('Button (%s) is not valid.' % button) 3558 3559 actual_events = self._record_input_events(device, 3560 gesture, 3561 address=device.address) 3562 3563 linux_input_button = {'LEFT': BTN_LEFT, 'RIGHT': BTN_RIGHT} 3564 expected_events = [ 3565 # Button down 3566 recorder.MSC_SCAN_BTN_EVENT[button], 3567 Event(EV_KEY, linux_input_button[button], 1), 3568 recorder.SYN_EVENT, 3569 # Button up 3570 recorder.MSC_SCAN_BTN_EVENT[button], 3571 Event(EV_KEY, linux_input_button[button], 0), 3572 recorder.SYN_EVENT] 3573 3574 self.results = { 3575 'actual_events': list(map(str, actual_events)), 3576 'expected_events': list(map(str, expected_events))} 3577 return actual_events == expected_events 3578 3579 3580 @test_retry_and_log 3581 def test_mouse_left_click(self, device): 3582 """Test that the mouse left click events could be received correctly. 3583 3584 @param device: the meta device containing a bluetooth HID device 3585 3586 @returns: True if the report received by the host matches the 3587 expected one. False otherwise. 3588 3589 """ 3590 return self._test_mouse_click(device, 'LEFT') 3591 3592 3593 @test_retry_and_log 3594 def test_mouse_right_click(self, device): 3595 """Test that the mouse right click events could be received correctly. 3596 3597 @param device: the meta device containing a bluetooth HID device 3598 3599 @returns: True if the report received by the host matches the 3600 expected one. False otherwise. 3601 3602 """ 3603 return self._test_mouse_click(device, 'RIGHT') 3604 3605 3606 def _test_mouse_move(self, device, delta_x=0, delta_y=0): 3607 """Test that the mouse move events could be received correctly. 3608 3609 @param device: the meta device containing a bluetooth HID device 3610 @param delta_x: the distance to move cursor in x axis 3611 @param delta_y: the distance to move cursor in y axis 3612 3613 @returns: True if the report received by the host matches the 3614 expected one. False otherwise. 3615 3616 """ 3617 gesture = lambda: device.Move(delta_x, delta_y) 3618 actual_events = self._record_input_events(device, 3619 gesture, 3620 address=device.address) 3621 3622 events_x = [Event(EV_REL, REL_X, delta_x)] if delta_x else [] 3623 events_y = [Event(EV_REL, REL_Y, delta_y)] if delta_y else [] 3624 expected_events = events_x + events_y + [recorder.SYN_EVENT] 3625 3626 self.results = { 3627 'actual_events': list(map(str, actual_events)), 3628 'expected_events': list(map(str, expected_events))} 3629 return actual_events == expected_events 3630 3631 3632 @test_retry_and_log 3633 def test_mouse_move_in_x(self, device, delta_x): 3634 """Test that the mouse move events in x could be received correctly. 3635 3636 @param device: the meta device containing a bluetooth HID device 3637 @param delta_x: the distance to move cursor in x axis 3638 3639 @returns: True if the report received by the host matches the 3640 expected one. False otherwise. 3641 3642 """ 3643 return self._test_mouse_move(device, delta_x=delta_x) 3644 3645 3646 @test_retry_and_log 3647 def test_mouse_move_in_y(self, device, delta_y): 3648 """Test that the mouse move events in y could be received correctly. 3649 3650 @param device: the meta device containing a bluetooth HID device 3651 @param delta_y: the distance to move cursor in y axis 3652 3653 @returns: True if the report received by the host matches the 3654 expected one. False otherwise. 3655 3656 """ 3657 return self._test_mouse_move(device, delta_y=delta_y) 3658 3659 3660 @test_retry_and_log 3661 def test_mouse_move_in_xy(self, device, delta_x, delta_y): 3662 """Test that the mouse move events could be received correctly. 3663 3664 @param device: the meta device containing a bluetooth HID device 3665 @param delta_x: the distance to move cursor in x axis 3666 @param delta_y: the distance to move cursor in y axis 3667 3668 @returns: True if the report received by the host matches the 3669 expected one. False otherwise. 3670 3671 """ 3672 return self._test_mouse_move(device, delta_x=delta_x, delta_y=delta_y) 3673 3674 3675 def _test_mouse_scroll(self, device, units): 3676 """Test that the mouse wheel events could be received correctly. 3677 3678 @param device: the meta device containing a bluetooth HID device 3679 @param units: the units to scroll in y axis 3680 3681 @returns: True if the report received by the host matches the 3682 expected one. False otherwise. 3683 3684 """ 3685 gesture = lambda: device.Scroll(units) 3686 recorded_events = self._record_input_events(device, 3687 gesture, 3688 address=device.address) 3689 3690 # Since high-speed scrolling events are inserted after they are passed 3691 # through bluetooth module, we ignore these events since they are 3692 # irrelevant for us 3693 scroll_events = [ev for ev in recorded_events 3694 if ev.code != REL_WHEEL_HI_RES] 3695 3696 expected_events = [Event(EV_REL, REL_WHEEL, units), recorder.SYN_EVENT] 3697 self.results = { 3698 'scroll_events': list(map(str, scroll_events)), 3699 'expected_events': list(map(str, expected_events))} 3700 return scroll_events == expected_events 3701 3702 3703 @test_retry_and_log 3704 def test_mouse_scroll_down(self, device, delta_y): 3705 """Test that the mouse wheel events could be received correctly. 3706 3707 @param device: the meta device containing a bluetooth HID device 3708 @param delta_y: the units to scroll down in y axis; 3709 should be a postive value 3710 3711 @returns: True if the report received by the host matches the 3712 expected one. False otherwise. 3713 3714 """ 3715 if delta_y > 0: 3716 return self._test_mouse_scroll(device, delta_y) 3717 else: 3718 raise error.TestError('delta_y (%d) should be a positive value', 3719 delta_y) 3720 3721 3722 @test_retry_and_log 3723 def test_mouse_scroll_up(self, device, delta_y): 3724 """Test that the mouse wheel events could be received correctly. 3725 3726 @param device: the meta device containing a bluetooth HID device 3727 @param delta_y: the units to scroll up in y axis; 3728 should be a postive value 3729 3730 @returns: True if the report received by the host matches the 3731 expected one. False otherwise. 3732 3733 """ 3734 if delta_y > 0: 3735 return self._test_mouse_scroll(device, -delta_y) 3736 else: 3737 raise error.TestError('delta_y (%d) should be a positive value', 3738 delta_y) 3739 3740 3741 @test_retry_and_log 3742 def test_mouse_click_and_drag(self, device, delta_x, delta_y): 3743 """Test that the mouse click-and-drag events could be received 3744 correctly. 3745 3746 @param device: the meta device containing a bluetooth HID device 3747 @param delta_x: the distance to drag in x axis 3748 @param delta_y: the distance to drag in y axis 3749 3750 @returns: True if the report received by the host matches the 3751 expected one. False otherwise. 3752 3753 """ 3754 gesture = lambda: device.ClickAndDrag(delta_x, delta_y) 3755 actual_events = self._record_input_events(device, 3756 gesture, 3757 address=device.address) 3758 3759 button = 'LEFT' 3760 expected_events = ( 3761 [# Button down 3762 recorder.MSC_SCAN_BTN_EVENT[button], 3763 Event(EV_KEY, BTN_LEFT, 1), 3764 recorder.SYN_EVENT] + 3765 # cursor movement in x and y 3766 ([Event(EV_REL, REL_X, delta_x)] if delta_x else []) + 3767 ([Event(EV_REL, REL_Y, delta_y)] if delta_y else []) + 3768 [recorder.SYN_EVENT] + 3769 # Button up 3770 [recorder.MSC_SCAN_BTN_EVENT[button], 3771 Event(EV_KEY, BTN_LEFT, 0), 3772 recorder.SYN_EVENT]) 3773 3774 self.results = { 3775 'actual_events': list(map(str, actual_events)), 3776 'expected_events': list(map(str, expected_events))} 3777 return actual_events == expected_events 3778 3779 3780 # ------------------------------------------------------------------- 3781 # Bluetooth keyboard related tests 3782 # ------------------------------------------------------------------- 3783 3784 # TODO may be deprecated as stated in b:140515628 3785 @test_retry_and_log 3786 def test_keyboard_input_from_string(self, device, string_to_send): 3787 """Test that the keyboard's key events could be received correctly. 3788 3789 @param device: the meta device containing a bluetooth HID device 3790 @param string_to_send: the set of keys that will be pressed one-by-one 3791 3792 @returns: True if the report received by the host matches the 3793 expected one. False otherwise. 3794 3795 """ 3796 3797 gesture = lambda: device.KeyboardSendString(string_to_send) 3798 3799 actual_events = self._record_input_events(device, 3800 gesture, 3801 address=device.address) 3802 3803 resulting_string = bluetooth_test_utils.reconstruct_string( 3804 actual_events) 3805 3806 return string_to_send == resulting_string 3807 3808 3809 @test_retry_and_log 3810 def test_keyboard_input_from_trace(self, device, trace_name): 3811 """ Tests that keyboard events can be transmitted and received correctly 3812 3813 @param device: the meta device containing a bluetooth HID device 3814 @param trace_name: string name for keyboard activity trace to be used 3815 in the test i.e. "simple_text" 3816 3817 @returns: true if the recorded output matches the expected output 3818 false otherwise 3819 """ 3820 length_correct = True 3821 content_correct = True 3822 3823 # Read data from trace I/O files 3824 input_trace = bluetooth_test_utils.parse_trace_file(os.path.join( 3825 TRACE_LOCATION, '{}_input.txt'.format(trace_name))) 3826 output_trace = bluetooth_test_utils.parse_trace_file(os.path.join( 3827 TRACE_LOCATION, '{}_output.txt'.format(trace_name))) 3828 3829 if not input_trace or not output_trace: 3830 logging.error('Failure in using trace') 3831 return False 3832 3833 # Disregard timing data for now 3834 input_scan_codes = [tup[1] for tup in input_trace] 3835 predicted_events = [Event(*tup[1]) for tup in output_trace] 3836 3837 # Create and run this trace as a gesture 3838 gesture = lambda: device.KeyboardSendTrace(input_scan_codes) 3839 rec_events = self._record_input_events(device, 3840 gesture, 3841 address=device.address) 3842 3843 # Filter out any input events that were not from the keyboard 3844 rec_key_events = [ev for ev in rec_events if ev.type == EV_KEY] 3845 3846 # Fail if we didn't record the correct number of events 3847 if len(rec_key_events) != len(input_scan_codes): 3848 logging.info('Expected {} events, received {}'.format( 3849 len(input_scan_codes), len(rec_key_events))) 3850 length_correct = False 3851 3852 for idx, predicted in enumerate(predicted_events): 3853 recorded = rec_key_events[idx] 3854 3855 if not predicted == recorded: 3856 content_correct = False 3857 break 3858 3859 self.results = { 3860 'received_events': len(rec_key_events) > 0, 3861 'length_correct': length_correct, 3862 'content_correct': content_correct, 3863 } 3864 3865 return all(self.results) 3866 3867 3868 def is_newer_kernel_version(self, version, minimum_version): 3869 """ Check if given kernel version is newer than unsupported version.""" 3870 3871 return utils.compare_versions(version, minimum_version) >= 0 3872 3873 3874 def is_supported_kernel_version(self, kernel_version, minimum_version, 3875 msg=None): 3876 """ Check if kernel version is greater than minimum version. 3877 3878 Check if given kernel version is greater than or equal to minimum 3879 version. Raise TEST_NA if given kernel version is lower than the 3880 minimum version. 3881 3882 Note: Kernel version may have suffixes, so ensure that minimum 3883 version should be the smallest version that is permissible. 3884 Ex: If minimum version is 3.8.11 then 3.8.11-<random> will 3885 pass the check. 3886 3887 @param kernel_version: kernel version to be checked as a string 3888 @param: minimum_version: minimum kernel version requried 3889 3890 @returns: None 3891 3892 @raises: TEST_NA if kernel version is not greater than the minimum 3893 version 3894 """ 3895 3896 logging.debug('kernel version is {} minimum version' 3897 'is {}'.format(kernel_version,minimum_version)) 3898 3899 if msg is None: 3900 msg = 'Test not supported on this kernel version' 3901 3902 if not self.is_newer_kernel_version(kernel_version, minimum_version): 3903 logging.info('Kernel version check failed: %s', msg) 3904 raise error.TestNAError(msg) 3905 3906 logging.debug('Kernel version check passed') 3907 3908 3909 # ------------------------------------------------------------------- 3910 # Bluetooth AVRCP related test 3911 # ------------------------------------------------------------------- 3912 3913 3914 @test_retry_and_log 3915 def test_avrcp_event(self, device, generator, avrcp_event): 3916 """Tests that AVRCP events can be transmitted and received correctly 3917 3918 @param device: the meta device containing a Bluetooth AVRCP capable 3919 audio device. 3920 @param generator: the peer device generator/function which trigger 3921 the AVRCP event. 3922 @param avrcp_event: the AVRCP event to test. 3923 3924 @returns: true if the recorded output matches the expected output 3925 false otherwise 3926 """ 3927 logging.debug('AVRCP Event Test, Event: %s', avrcp_event) 3928 linux_input_button = {'play': KEY_PLAYCD, 'pause': KEY_PAUSECD, 3929 'stop': KEY_STOPCD, 'next': KEY_NEXTSONG, 3930 'previous': KEY_PREVIOUSSONG} 3931 expected_event = [ 3932 # Button down 3933 Event(EV_KEY, linux_input_button[avrcp_event], 1), 3934 recorder.SYN_EVENT, 3935 # Button up 3936 Event(EV_KEY, linux_input_button[avrcp_event], 0), 3937 recorder.SYN_EVENT] 3938 3939 gesture = lambda: generator(avrcp_event) 3940 actual_event = self._record_input_events(device, gesture) 3941 3942 return actual_event == expected_event 3943 3944 3945 # ------------------------------------------------------------------- 3946 # Servod related tests 3947 # ------------------------------------------------------------------- 3948 3949 @test_retry_and_log 3950 def test_power_consumption(self, device, max_power_mw): 3951 """Test the average power consumption.""" 3952 power_mw = device.servod.MeasurePowerConsumption() 3953 self.results = {'power_mw': power_mw} 3954 3955 if (power_mw is None): 3956 logging.error('Failed to measure power consumption') 3957 return False 3958 3959 power_mw = float(power_mw) 3960 logging.info('power consumption (mw): %f (max allowed: %f)', 3961 power_mw, max_power_mw) 3962 3963 return power_mw <= max_power_mw 3964 3965 3966 @test_retry_and_log 3967 def test_start_notify(self, object_path, cccd_value): 3968 """Test that a notification can be started on a characteristic 3969 3970 @param object_path: the object path of the characteristic. 3971 @param cccd_value: Possible CCCD values include 3972 0x00 - inferred from the remote characteristic's properties 3973 0x01 - notification 3974 0x02 - indication 3975 3976 @returns: The test results. 3977 3978 """ 3979 if object_path is None: 3980 logging.error('Invalid object path') 3981 return False 3982 3983 start_notify = self.bluetooth_facade.start_notify( 3984 object_path, cccd_value) 3985 is_notifying = self._wait_for_condition( 3986 lambda: self.bluetooth_facade.is_notifying( 3987 object_path), method_name()) 3988 3989 self.results = { 3990 'start_notify': start_notify, 3991 'is_notifying': is_notifying} 3992 3993 return all(self.results.values()) 3994 3995 3996 @test_retry_and_log 3997 def test_stop_notify(self, object_path): 3998 """Test that a notification can be stopped on a characteristic 3999 4000 @param object_path: the object path of the characteristic. 4001 4002 @returns: The test results. 4003 4004 """ 4005 if object_path is None: 4006 logging.error('Invalid object path') 4007 return False 4008 4009 stop_notify = self.bluetooth_facade.stop_notify(object_path) 4010 is_not_notifying = self._wait_for_condition( 4011 lambda: not self.bluetooth_facade.is_notifying( 4012 object_path), method_name()) 4013 4014 self.results = { 4015 'stop_notify': stop_notify, 4016 'is_not_notifying': is_not_notifying} 4017 4018 return all(self.results.values()) 4019 4020 4021 @test_retry_and_log(False) 4022 def test_set_discovery_filter(self, filter): 4023 """Test set discovery filter""" 4024 4025 return self.bluetooth_facade.set_discovery_filter(filter) 4026 4027 4028 @test_retry_and_log(False) 4029 def test_set_le_connection_parameters(self, address, parameters): 4030 """Test set LE connection parameters""" 4031 4032 return self.bluetooth_facade.set_le_connection_parameters( 4033 address, parameters) 4034 4035 4036 @test_retry_and_log(False) 4037 def test_get_connection_info(self, address): 4038 """Test that connection info to device is retrievable.""" 4039 4040 return (self.bluetooth_facade.get_connection_info(address) 4041 is not None) 4042 4043 4044 @test_retry_and_log(False, messages_stop=False) 4045 def test_suspend_and_wait_for_sleep(self, suspend, sleep_timeout): 4046 """ Suspend the device and wait until it is sleeping. 4047 4048 @param suspend: Sub-process that does the actual suspend call. 4049 @param sleep_timeout time limit in seconds to allow the host sleep. 4050 4051 @return True if host is asleep within a short timeout, False otherwise. 4052 """ 4053 suspend.start() 4054 try: 4055 self.host.test_wait_for_sleep(sleep_timeout) 4056 except Exception as e: 4057 suspend.join() 4058 self.results = {'exception': str(e)} 4059 return False 4060 4061 return True 4062 4063 4064 @test_retry_and_log(False, messages_start=False) 4065 def test_wait_for_resume(self, 4066 boot_id, 4067 suspend, 4068 resume_timeout, 4069 test_start_time, 4070 resume_slack=RESUME_DELTA, 4071 fail_on_timeout=False, 4072 fail_early_wake=True): 4073 """ Wait for device to resume from suspend. 4074 4075 @param boot_id: Current boot id 4076 @param suspend: Sub-process that does actual suspend call. 4077 @param resume_timeout: Expect device to resume in given timeout. 4078 @param test_start_time: When was this test started? (device time) 4079 @param resume_slack: Allow some slack on resume timeout. 4080 @param fail_on_timeout: Fails if timeout is reached 4081 @param fail_early_wake: Fails if timeout isn't reached 4082 4083 @return True if suspend sub-process completed without error. 4084 """ 4085 success = False 4086 results = {} 4087 4088 def _check_timeout(delta): 4089 if delta > timedelta(seconds=resume_timeout): 4090 return not fail_on_timeout 4091 else: 4092 return not fail_early_wake 4093 4094 def _check_suspend_attempt_or_raise(test_start, wake_at): 4095 """Make sure suspend attempt was recent or raise TestNA. 4096 4097 If we're looking at a previous suspend attempt, it means the test 4098 didn't trigger a suspend properly (i.e. no powerd call) 4099 4100 @param test_start: When we started the test. 4101 @param wake_at: When powerd suspend resumed. 4102 4103 @raises: error.TestNAError if found suspend occurred before we 4104 started the test. 4105 """ 4106 # If the last suspend attempt was before we started the test, it's 4107 # probably not a recent attempt. 4108 if wake_at < test_start: 4109 raise error.TestNAError( 4110 'No recent suspend attempt found. ' 4111 'Started test at {} but last suspend ended at {}'. 4112 format(test_start, wake_at)) 4113 4114 return True 4115 4116 def _check_retcode_or_raise(retcode): 4117 """Make sure powerd return was successful. 4118 4119 @param retcode: Return code of powerd_suspend. 4120 4121 @raises: error.TestNAError if failed suspend due to non-BT 4122 @return: False if BT woke us, True otherwise 4123 """ 4124 if retcode: 4125 if self.bluetooth_facade.bt_caused_last_resume(): 4126 return False 4127 else: 4128 raise error.TestNAError( 4129 'Failed suspend due to non-BT wake') 4130 4131 return True 4132 4133 # Sometimes it takes longer to resume from suspend; give some leeway 4134 resume_timeout = resume_timeout + resume_slack 4135 results['resume timeout'] = resume_timeout 4136 try: 4137 start = datetime.now() 4138 4139 # Wait for resume needs to wait longer in case device rebooted. 4140 # Otherwise, the test will fail with errno 111 (connection refused) 4141 self.host.test_wait_for_resume( 4142 boot_id, resume_timeout=self.RESUME_INTERNAL_TIMEOUT_SECS) 4143 4144 results['device accessible on resume'] = True 4145 4146 # As of now, a timeout in test_wait_for_resume doesn't raise. Start 4147 # by first measuring the delta until network is back up to the dut. 4148 network_delta = datetime.now() - start 4149 4150 # Use powerd logs to see how much time we actually spent in suspend 4151 # If the network went down during suspend, we will have spent less 4152 # time in suspend than expected. If we can't find info via powerd, 4153 # we can use measured time instead. 4154 info = self.bluetooth_facade.find_last_suspend_via_powerd_logs() 4155 if info: 4156 (start_suspend_at, end_suspend_at, retcode) = info 4157 actual_delta = end_suspend_at - start_suspend_at 4158 results['powerd time to resume'] = actual_delta.total_seconds() 4159 results['powerd retcode'] = retcode 4160 4161 # Resume is successful if suspend occurred correctly and woke up 4162 # within the timeout. One significant caveat is that we only 4163 # fail here if BT blocked suspend, not if we woke spuriously. 4164 # This is by design (we depend on the timeout to check for 4165 # spurious wakeup). 4166 success = _check_suspend_attempt_or_raise( 4167 test_start_time, 4168 end_suspend_at) and _check_retcode_or_raise( 4169 retcode) and _check_timeout(actual_delta) 4170 else: 4171 results['time to resume'] = network_delta.total_seconds() 4172 success = _check_timeout(network_delta) 4173 except error.TestFail as e: 4174 results['device accessible on resume'] = False 4175 success = False 4176 logging.error('wait_for_resume: %s', e) 4177 4178 # If the resume failed due to a reboot, raise the testFail and exit 4179 # early from the test 4180 if 'client rebooted' in str(e): 4181 raise 4182 finally: 4183 suspend.join() 4184 4185 results['success'] = success 4186 results['suspend exit code'] = suspend.exitcode 4187 self.results = results 4188 4189 return all([success, suspend.exitcode == 0]) 4190 4191 4192 def suspend_async(self, suspend_time, expect_bt_wake=False): 4193 """ Suspend asynchronously and return process for joining 4194 4195 @param suspend_time: how long to stay in suspend 4196 @param expect_bt_wake: Whether we expect bluetooth to wake us from 4197 suspend. If true, we expect this resume will occur early 4198 4199 @returns multiprocessing.Process object with suspend task 4200 """ 4201 4202 def _action_suspend(): 4203 try: 4204 self.bluetooth_facade.do_suspend(suspend_time, expect_bt_wake) 4205 except socket.error as e: 4206 # Socket errors may occur after suspend if the underlying 4207 # connection is lost during suspend (happens if usb-ethernet 4208 # disconnects and reconnects on resume). Catch all these errors 4209 # and swallow them. 4210 logging.warning( 4211 'Socket error on suspend. Swallowing error: %s', 4212 str(e)) 4213 return 0 4214 4215 proc = multiprocessing.Process(target=_action_suspend) 4216 proc.daemon = True 4217 return proc 4218 4219 4220 def device_connect_async(self, 4221 device_type, 4222 device, 4223 adapter_address, 4224 delay_wake=1, 4225 should_wake=True): 4226 """ Connects peer device asynchronously with DUT. 4227 4228 This function uses a thread instead of a subprocess so that the test 4229 result is stored for the test. Otherwise, the test connection was 4230 sometimes failing but the test itself was passing. 4231 4232 @param device_type: The device type (used to check if it's LE) 4233 @param device: the meta device with the peer device 4234 @param adapter_address: the address of the adapter 4235 @param delay_wake: delay wakeup by this many seconds 4236 @param should_wake: Should this cause a wakeup? 4237 4238 @returns threading.Thread object with device connect task 4239 """ 4240 4241 def _action_device_connect(): 4242 time.sleep(delay_wake) 4243 if 'BLE' in device_type: 4244 # LE reconnects by advertising (dut controller will create LE 4245 # connection, not the peer device) 4246 self.test_device_set_discoverable(device, True) 4247 else: 4248 # Classic requires peer to initiate a connection to wake up the 4249 # dut 4250 connect_func = self.test_connection_by_device_only 4251 if should_wake: 4252 connect_func(device, adapter_address) 4253 else: 4254 # If we're not expecting wake, this connect attempt will 4255 # probably fail. 4256 self.ignore_failure(connect_func, device, adapter_address) 4257 4258 thread = threading.Thread(target=_action_device_connect) 4259 return thread 4260 4261 4262 @test_retry_and_log(False) 4263 def test_hid_device_created(self, device_address): 4264 """ Tests that the hid device is created before using it for tests. 4265 4266 @param device_address: Address of peripheral device 4267 """ 4268 device_found = self.bluetooth_facade.wait_for_hid_device( 4269 device_address) 4270 self.results = { 4271 'device_found': device_found 4272 } 4273 return all(self.results.values()) 4274 4275 4276 @test_retry_and_log 4277 def test_battery_reporting(self, device): 4278 """ Tests that battery reporting through GATT can be received 4279 4280 @param device: the meta device containing a Bluetooth device 4281 4282 @returns: true if battery reporting is received 4283 """ 4284 4285 percentage = self.bluetooth_facade.get_battery_property( 4286 device.address, 'Percentage') 4287 4288 return percentage > 0 4289 4290 def _apply_new_adapter_alias(self, alias): 4291 """ Sets new system alias and applies discoverable setting 4292 4293 @param alias: string alias to be applied to Adapter->Alias property 4294 """ 4295 4296 # Set Adapter's Alias property 4297 self.bluetooth_facade.set_adapter_alias(alias) 4298 4299 # Set discoverable setting on 4300 self.bluetooth_facade.set_discoverable(True) 4301 4302 @test_retry_and_log(False) 4303 def test_set_adapter_alias(self, alias): 4304 """ Validates that a new adapter alias is applied correctly 4305 4306 @param alias: string alias to be applied to Adapter->Alias property 4307 4308 @returns: True if the applied alias is properly applied in btmon trace 4309 """ 4310 4311 orig_alias = self.get_adapter_properties()['Alias'] 4312 self.bluetooth_le_facade = self.bluetooth_facade 4313 4314 # 1. Capture btmon logs around alias set operation 4315 self._get_btmon_log(lambda: self._apply_new_adapter_alias(alias)) 4316 4317 # 2. Verify that name appears in btmon trace with the following format: 4318 # "Name (complete): Chromebook_BA0E" as appears in EIR data set 4319 expected_alias_str = 'Name (complete): ' + alias 4320 alias_found = self.bluetooth_facade.btmon_find(expected_alias_str) 4321 4322 # 3. Re-apply previous bluez alias as other tests expect default 4323 self.bluetooth_facade.set_adapter_alias(orig_alias) 4324 4325 self.results = {'alias_found': alias_found} 4326 return all(self.results.values()) 4327 4328 # ------------------------------------------------------------------- 4329 # Autotest methods 4330 # ------------------------------------------------------------------- 4331 4332 4333 def initialize(self): 4334 """Initialize bluetooth adapter tests.""" 4335 # Run through every tests and collect failed tests in self.fails. 4336 self.fails = [] 4337 4338 # If a test depends on multiple conditions, write the results of 4339 # the conditions in self.results so that it is easy to know 4340 # what conditions failed by looking at the log. 4341 self.results = None 4342 4343 # Some tests may instantiate a peripheral device for testing. 4344 self.devices = dict() 4345 self.shared_peers = [] 4346 for device_type in SUPPORTED_DEVICE_TYPES: 4347 self.devices[device_type] = list() 4348 4349 # The count of registered advertisements. 4350 self.count_advertisements = 0 4351 4352 4353 def update_btpeer(self): 4354 """ Check and update the chameleond bundle on Bluetooth peer 4355 Latest chameleond bundle and git commit is stored in the google cloud 4356 This function compares the git commit of the Bluetooth peers and update 4357 the peer if the commit does not match 4358 4359 @returns True: If all peer are updated to (or currently) in latest 4360 commit. False if any update fails 4361 4362 """ 4363 def _update_btpeer(): 4364 status = {} 4365 for peer in self.host.btpeer_list: 4366 status[peer] = {} 4367 status[peer]['update_needed'] = \ 4368 bluetooth_peer_update.is_update_needed(peer, commit) 4369 4370 logging.debug(status) 4371 if not any([v['update_needed'] for v in status.values()]): 4372 logging.info('No peer needed update') 4373 return True 4374 logging.debug('Atleast one peer needs update') 4375 4376 if not bluetooth_peer_update.download_installation_files(self.host, 4377 commit): 4378 logging.error('Unable to download installation files ') 4379 return False 4380 4381 # TODO(b:160782273) Make this parallel 4382 for peer in self.host.btpeer_list: 4383 if status[peer]['update_needed']: 4384 status[peer]['updated'], status[peer]['reason'] = \ 4385 bluetooth_peer_update.update_peer(peer, commit) 4386 4387 for peer, v in status.items(): 4388 if not v['update_needed']: 4389 logging.debug('peer %s did not need update', str(peer.host)) 4390 elif not v['updated']: 4391 logging.error('update peer %s failed %s', str(peer.host), 4392 v['reason']) 4393 else: 4394 logging.debug('peer %s updated successfully', 4395 str(peer.host)) 4396 4397 return all([v['updated'] for v in status.values() 4398 if v['update_needed']]) 4399 4400 try: 4401 commit = None 4402 (_, commit) = bluetooth_peer_update.get_latest_commit() 4403 if commit is None: 4404 logging.error('Unable to get current commit') 4405 return False 4406 4407 return _update_btpeer() 4408 except Exception as e: 4409 logging.error('Exception %s in update_btpeer', str(e)) 4410 return False 4411 finally: 4412 if not bluetooth_peer_update.cleanup(self.host, commit): 4413 logging.error('Update peer cleanup failed') 4414 4415 4416 def get_chipset_name(self): 4417 """ Get the name of BT/WiFi chipset on this host 4418 4419 @returns chipset name if successful else '' 4420 """ 4421 (vid,pid) = self.bluetooth_facade.get_wlan_vid_pid() 4422 logging.debug('Bluetooth module vid pid is %s %s', vid, pid) 4423 if vid is None or pid is None: 4424 # Controllers that aren't WLAN+BT combo chips does not expose 4425 # Vendor ID/Product ID. Use alternate method. 4426 # This will return one of ['WCN3991', ''] or a string containing 4427 # the name of chipset read from DUT 4428 return self.bluetooth_facade.get_bt_module_name() 4429 for name, l in CHIPSET_TO_VIDPID.items(): 4430 if (vid, pid) in l: 4431 return name 4432 return '' 4433 4434 4435 def verify_device_rssi(self, address_list): 4436 """ Test device rssi is over required threshold. 4437 4438 @param address_list: List of peer devices to verify address for 4439 4440 @raises error.TestNA if any device isn't found or RSSI is too low 4441 """ 4442 try: 4443 self.test_start_discovery() 4444 for device_address in address_list: 4445 # The RSSI property is only maintained while discovery is 4446 # enabled. Stopping discovery removes the property. Thus, look 4447 # up the RSSI without modifying discovery state. 4448 found = self.test_discover_device(device_address, 4449 start_discovery=False, 4450 stop_discovery=False) 4451 rssi = self.bluetooth_facade.get_device_property( 4452 device_address, 'RSSI') 4453 4454 if not found: 4455 logging.info('Failing with TEST_NA as peer %s was not' 4456 ' discovered', device_address) 4457 raise error.TestNAError( 4458 'Peer {} not discovered'.format(device_address)) 4459 4460 if not rssi or rssi < self.MIN_RSSI: 4461 logging.info('Failing with TEST_NA since RSSI (%s) is low ', 4462 rssi) 4463 raise error.TestNAError( 4464 'Peer {} RSSI is too low: {}'.format( 4465 device_address, rssi)) 4466 4467 logging.info('Peer {} RSSI {}'.format(device_address, rssi)) 4468 finally: 4469 self.test_stop_discovery() 4470 4471 4472 def verify_controller_capability(self, required_roles=[], 4473 test_type=''): 4474 """Raise an exception if required role support isn't present 4475 4476 @param required_roles: List of test role requirements in 4477 ["central", "peripheral", "central-peripheral"] 4478 4479 @raises: error.TestFail if device does not meet requirements 4480 AND test_type is 'AVL' 4481 error.TestNA if device does not meet requirements 4482 and test_type is not 'AVL' 4483 """ 4484 4485 adapter_props = self.get_adapter_properties() 4486 4487 supported_roles = adapter_props.get('Roles', []) 4488 4489 for req in required_roles: 4490 if req not in supported_roles: 4491 # We don't meet requirements, throw error 4492 msg = 'Role requirement {} not in supported modes {}'.format( 4493 req, supported_roles) 4494 4495 if test_type == 'AVL': 4496 raise error.TestFail(msg) 4497 4498 logging.info('Failing with TEST_NA due to %s', msg) 4499 raise error.TestNAError(msg) 4500 4501 4502 def set_fail_fast(self, args_dict, default=False): 4503 """Set whether the test should fail fast if running into any problem 4504 4505 By default it should not fail fast so that a batch test can continue 4506 running the rest after a failure in one test 4507 4508 :param args_dict: the arguments passed int from the command line 4509 :param default: the default value when the flag is missing from the 4510 args_dict 4511 4512 """ 4513 flag_name = 'fail_fast' 4514 if args_dict and flag_name in args_dict: 4515 self.fail_fast = bool(args_dict[flag_name].lower() == 'true') 4516 else: 4517 self.fail_fast = default 4518 4519 4520 def assert_discover_and_pair(self, device): 4521 """ Discovers and pairs given device. Automatically connects too. 4522 4523 If any of the test expressions fail, it will raise an error so only call 4524 this function as a setup for a test. 4525 """ 4526 self.assert_on_fail(self.test_device_set_discoverable(device, True)) 4527 self.assert_on_fail(self.test_discover_device(device.address)) 4528 self.assert_on_fail( 4529 self.test_pairing(device.address, device.pin, trusted=True)) 4530 4531 def run_once(self, *args, **kwargs): 4532 """This method should be implemented by children classes. 4533 4534 Typically, the run_once() method would look like: 4535 4536 factory = remote_facade_factory.RemoteFacadeFactory(host) 4537 self.bluetooth_facade = factory.create_bluetooth_facade() 4538 4539 self.test_bluetoothd_running() 4540 # ... 4541 # invoke more self.test_xxx() tests. 4542 # ... 4543 4544 if self.fails: 4545 raise error.TestFail(self.fails) 4546 4547 """ 4548 raise NotImplementedError 4549 4550 4551 def cleanup(self, test_state='END'): 4552 """Clean up bluetooth adapter tests. 4553 4554 @param test_state: string describing the requested clear is for 4555 a new test(NEW), the middle of the test(MID), 4556 or the end of the test(END). 4557 """ 4558 4559 if test_state == 'END': 4560 # Disable all the bluetooth debug logs 4561 self.enable_disable_debug_log(enable=False) 4562 4563 # Re-enable cellular services 4564 self.enable_disable_cellular(enable=True) 4565 4566 # Re-enable ui 4567 self.enable_disable_ui(enable=True) 4568 4569 if hasattr(self, 'host'): 4570 # Stop btmon process 4571 self.host.run('pkill btmon || true') 4572 4573 #Stop tcpdump usbmon process 4574 self.host.run('pkill tcpdump || true') 4575 4576 4577 # Close the device properly if a device is instantiated. 4578 # Note: do not write something like the following statements 4579 # if self.devices[device_type]: 4580 # or 4581 # if bool(self.devices[device_type]): 4582 # Otherwise, it would try to invoke bluetooth_mouse.__nonzero__() 4583 # which just does not exist. 4584 for device_name, device_list in self.devices.items(): 4585 for device in device_list: 4586 if device is not None: 4587 device.Close() 4588 4589 # Power cycle BT device if we're in the middle of a test 4590 if test_state == 'MID': 4591 device.PowerCycle() 4592 4593 self.devices = dict() 4594 for device_type in SUPPORTED_DEVICE_TYPES: 4595 self.devices[device_type] = list() 4596