1# Copyright 2016 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Server side bluetooth adapter subtests.""" 6 7import errno 8import functools 9import httplib 10import inspect 11import logging 12import os 13import re 14from socket import error as SocketError 15import time 16 17import bluetooth_test_utils 18 19from autotest_lib.client.bin import utils 20from autotest_lib.client.bin.input import input_event_recorder as recorder 21from autotest_lib.client.common_lib import error 22from autotest_lib.client.common_lib.cros.bluetooth import bluetooth_socket 23from autotest_lib.client.cros.chameleon import chameleon 24from autotest_lib.server import test 25 26from autotest_lib.client.bin.input.linux_input import ( 27 BTN_LEFT, BTN_RIGHT, EV_KEY, EV_REL, REL_X, REL_Y, REL_WHEEL) 28from autotest_lib.server.cros.bluetooth.bluetooth_gatt_client_utils import ( 29 GATT_ClientFacade, GATT_Application, GATT_HIDApplication) 30from autotest_lib.server.cros.multimedia import remote_facade_factory 31 32 33Event = recorder.Event 34 35# Location of data traces relative to this (bluetooth_adapter_tests.py) file 36BT_ADAPTER_TEST_PATH = os.path.dirname(__file__) 37TRACE_LOCATION = os.path.join(BT_ADAPTER_TEST_PATH, 'input_traces/keyboard') 38 39# Delay binding the methods since host is only available at run time. 40SUPPORTED_DEVICE_TYPES = { 41 'MOUSE': lambda chameleon: chameleon.get_bluetooth_hid_mouse, 42 'KEYBOARD': lambda chameleon: chameleon.get_bluetooth_hid_keyboard, 43 'BLE_MOUSE': lambda chameleon: chameleon.get_ble_mouse, 44 'BLE_KEYBOARD': lambda chameleon: chameleon.get_ble_keyboard, 45 'A2DP_SINK': lambda chameleon: chameleon.get_bluetooth_a2dp_sink, 46 47 # This is a base object that does not emulate any Bluetooth device. 48 # This object is preferred when only a pure XMLRPC server is needed 49 # on the chameleon host, e.g., to perform servod methods. 50 'BLUETOOTH_BASE': lambda chameleon: chameleon.get_bluetooth_base, 51} 52 53 54def method_name(): 55 """Get the method name of a class. 56 57 This function is supposed to be invoked inside a class and will 58 return current method name who invokes this function. 59 60 @returns: the string of the method name inside the class. 61 """ 62 return inspect.getouterframes(inspect.currentframe())[1][3] 63 64 65def _run_method(method, method_name, *args, **kwargs): 66 """Run a target method and capture exceptions if any. 67 68 This is just a wrapper of the target method so that we do not need to 69 write the exception capturing structure repeatedly. The method could 70 be either a device method or a facade method. 71 72 @param method: the method to run 73 @param method_name: the name of the method 74 75 @returns: the return value of target method() if successful. 76 False otherwise. 77 78 """ 79 result = False 80 try: 81 result = method(*args, **kwargs) 82 except Exception as e: 83 logging.error('%s: %s', method_name, e) 84 except: 85 logging.error('%s: unexpected error', method_name) 86 return result 87 88 89def get_bluetooth_emulated_device(chameleon, device_type): 90 """Get the bluetooth emulated device object. 91 92 @param chameleon: the chameleon host 93 @param device_type : the bluetooth device type, e.g., 'MOUSE' 94 95 @returns: the bluetooth device object 96 97 """ 98 99 def _retry_device_method(method_name, legal_falsy_values=[]): 100 """retry the emulated device's method. 101 102 The method is invoked as device.xxxx() e.g., device.GetAdvertisedName(). 103 104 Note that the method name string is provided to get the device's actual 105 method object at run time through getattr(). The rebinding is required 106 because a new device may have been created previously or during the 107 execution of fix_serial_device(). 108 109 Given a device's method, it is not feasible to get the method name 110 through __name__ attribute. This limitation is due to the fact that 111 the device is a dotted object of an XML RPC server proxy. 112 As an example, with the method name 'GetAdvertisedName', we could 113 derive the correspoinding method device.GetAdvertisedName. On the 114 contrary, given device.GetAdvertisedName, it is not feasible to get the 115 method name by device.GetAdvertisedName.__name__ 116 117 Also note that if the device method fails, we would try remediation 118 step and retry the device method. The remediation steps are 119 1) re-creating the serial device. 120 2) reset (powercycle) the bluetooth dongle. 121 3) reboot chameleond host. 122 If the device method still fails after these steps, we fail the test 123 124 The default values exist for uses of this function before the options 125 were added, ideally we should change zero_ok to False. 126 127 @param method_name: the string of the method name. 128 @param legal_falsy_values: Values that are falsy but might be OK. 129 130 @returns: the result returned by the device's method if the call was 131 successful 132 133 @raises: TestError if the devices's method fails or if repair of 134 peripheral kit fails 135 136 """ 137 138 action_table = [('recreate' , 'Fixing the serial device'), 139 ('reset', 'Power cycle the peer device'), 140 ('reboot', 'Reboot the chamleond host')] 141 142 for i, (action, description) in enumerate(action_table): 143 logging.info('Attempt %s : %s ', i+1, method_name) 144 145 result = _run_method(getattr(device, method_name), method_name) 146 if _is_successful(result, legal_falsy_values): 147 return result 148 149 logging.error('%s failed the %s time. Attempting to %s', 150 method_name,i,description) 151 if not fix_serial_device(chameleon, device, action): 152 logging.info('%s failed', description) 153 else: 154 logging.info('%s successful', description) 155 156 #try it last time after fix it by last action 157 result = _run_method(getattr(device, method_name), method_name) 158 if _is_successful(result, legal_falsy_values): 159 return result 160 161 raise error.TestError('Failed to execute %s. Bluetooth peer device is' 162 'not working' % method_name) 163 164 165 if device_type not in SUPPORTED_DEVICE_TYPES: 166 raise error.TestError('The device type is not supported: %s', 167 device_type) 168 169 # Get the bluetooth device object and query some important properties. 170 device = SUPPORTED_DEVICE_TYPES[device_type](chameleon)() 171 172 # Get some properties of the kit 173 # NOTE: Strings updated here must be kept in sync with Chameleon. 174 device._capabilities = _retry_device_method('GetCapabilities') 175 device._transports = device._capabilities["CAP_TRANSPORTS"] 176 device._is_le_only = ("TRANSPORT_LE" in device._transports and 177 len(device._transports) == 1) 178 device._has_pin = device._capabilities["CAP_HAS_PIN"] 179 device.can_init_connection = device._capabilities["CAP_INIT_CONNECT"] 180 181 _retry_device_method('Init') 182 logging.info('device type: %s', device_type) 183 184 device.name = _retry_device_method('GetAdvertisedName') 185 logging.info('device name: %s', device.name) 186 187 device.address = _retry_device_method('GetLocalBluetoothAddress') 188 logging.info('address: %s', device.address) 189 190 pin_falsy_values = [] if device._has_pin else [None] 191 device.pin = _retry_device_method('GetPinCode', pin_falsy_values) 192 logging.info('pin: %s', device.pin) 193 194 class_falsy_values = [None] if device._is_le_only else [0] 195 196 # Class of service is None for LE-only devices. Don't fail or parse it. 197 device.class_of_service = _retry_device_method('GetClassOfService', 198 class_falsy_values) 199 if device._is_le_only: 200 parsed_class_of_service = device.class_of_service 201 else: 202 parsed_class_of_service = "0x%04X" % device.class_of_service 203 logging.info('class of service: %s', parsed_class_of_service) 204 205 device.class_of_device = _retry_device_method('GetClassOfDevice', 206 class_falsy_values) 207 # Class of device is None for LE-only devices. Don't fail or parse it. 208 if device._is_le_only: 209 parsed_class_of_device = device.class_of_device 210 else: 211 parsed_class_of_device = "0x%04X" % device.class_of_device 212 logging.info('class of device: %s', parsed_class_of_device) 213 214 device.device_type = _retry_device_method('GetHIDDeviceType') 215 logging.info('device type: %s', device.device_type) 216 217 device.authentication_mode = None 218 if not device._is_le_only: 219 device.authentication_mode = _retry_device_method('GetAuthenticationMode') 220 logging.info('authentication mode: %s', device.authentication_mode) 221 222 device.port = _retry_device_method('GetPort') 223 logging.info('serial port: %s\n', device.port) 224 225 return device 226 227 228def recreate_serial_device(device): 229 """Create and connect to a new serial device. 230 231 @param device: the bluetooth HID device 232 233 @returns: True if the serial device is re-created successfully. 234 235 """ 236 logging.info('Remove the old serial device and create a new one.') 237 if device is not None: 238 try: 239 device.Close() 240 except: 241 logging.error('failed to close the serial device.') 242 return False 243 try: 244 device.CreateSerialDevice() 245 return True 246 except: 247 logging.error('failed to invoke CreateSerialDevice.') 248 return False 249 250 251def _check_device_init(device, operation): 252 # Check if the serial device could initialize, connect, and 253 # enter command mode correctly. 254 logging.info('Checking device status...') 255 if not _run_method(device.Init, 'Init'): 256 logging.info('device.Init: failed after %s', operation) 257 return False 258 if not device.CheckSerialConnection(): 259 logging.info('device.CheckSerialConnection: failed after %s', operation) 260 return False 261 if not _run_method(device.EnterCommandMode, 'EnterCommandMode'): 262 logging.info('device.EnterCommandMode: failed after %s', operation) 263 return False 264 logging.info('The device is created successfully after %s.', operation) 265 return True 266 267def _reboot_chameleon(chameleon, device): 268 """ Reboot chameleond host 269 270 Also power cycle the device since reboot may not do that..""" 271 272 # Chameleond fizz hosts should have write protect removed and 273 # set_gbb_flags set to 0 to minimize boot time 274 REBOOT_SLEEP_SECS = 10 275 RESET_SLEEP_SECS = 1 276 277 # Close the bluetooth peripheral device and reboot the chameleon board. 278 device.Close() 279 logging.info("Powercycling the device") 280 device.PowerCycle() 281 time.sleep(RESET_SLEEP_SECS) 282 logging.info('rebooting chameleon...') 283 chameleon.reboot() 284 285 # Every chameleon reboot would take a bit more than REBOOT_SLEEP_SECS. 286 # Sleep REBOOT_SLEEP_SECS and then begin probing the chameleon board. 287 time.sleep(REBOOT_SLEEP_SECS) 288 return _check_device_init(device, 'reboot') 289 290def _reset_device_power(device): 291 """Power cycle the device.""" 292 RESET_SLEEP_SECS = 1 293 try: 294 if not device.PowerCycle(): 295 logging.info('device.PowerCycle() failed') 296 return False 297 except: 298 logging.error('exception in device.PowerCycle') 299 else: 300 logging.info('device powercycled') 301 time.sleep(RESET_SLEEP_SECS) 302 return _check_device_init(device, 'reset') 303 304def _is_successful(result, legal_falsy_values=[]): 305 """Is the method result considered successful? 306 307 Some method results, for example that of class_of_service, may be 0 which is 308 considered a valid result. Occassionally, None is acceptable. 309 310 The default values exist for uses of this function before the options were 311 added, ideally we should change zero_ok to False. 312 313 @param result: a method result 314 @param legal_falsy_values: Values that are falsy but might be OK. 315 316 @returns: True if bool(result) is True, or if result is 0 and zero_ok, or if 317 result is None and none_ok. 318 """ 319 truthiness_of_result = bool(result) 320 return truthiness_of_result or result in legal_falsy_values 321 322 323def fix_serial_device(chameleon, device, operation='reset'): 324 """Fix the serial device. 325 326 This function tries to fix the serial device by 327 (1) re-creating a serial device, or 328 (2) power cycling the usb port to which device is connected 329 (3) rebooting the chameleon board. 330 331 Argument operation determine which of the steps above are perform 332 333 Note that rebooting the chameleon board or reseting the device will remove 334 the state on the peripheral which might cause test failures. Please use 335 reset/reboot only before or after a test. 336 337 @param chameleon: the chameleon host 338 @param device: the bluetooth device. 339 @param operation: Recovery operation to perform 'recreate/reset/reboot' 340 341 @returns: True if the serial device is fixed. False otherwise. 342 343 """ 344 345 if operation == 'recreate': 346 # Check the serial connection. Fix it if needed. 347 if device.CheckSerialConnection(): 348 # The USB serial connection still exists. 349 # Re-connection suffices to solve the problem. The problem 350 # is usually caused by serial port change. For example, 351 # the serial port changed from /dev/ttyUSB0 to /dev/ttyUSB1. 352 logging.info('retry: creating a new serial device...') 353 return recreate_serial_device(device) 354 else: 355 # Recreate the bluetooth peer device 356 return _check_device_init(device, operation) 357 358 elif operation == 'reset': 359 # Powercycle the USB port where the bluetooth peer device is connected. 360 # RN-42 and RN-52 share the same vid:pid so both will be powercycled. 361 # This will only work on fizz host with write protection removed. 362 # Note that the state on the device will be lost. 363 return _reset_device_power(device) 364 365 elif operation == 'reboot': 366 # Reboot the chameleon host. 367 # The device is power cycled before rebooting chameleon host 368 return _reboot_chameleon(chameleon, device) 369 370 else: 371 logging.error('fix_serial_device Invalid operation %s', operation) 372 return False 373 374 375def retry(test_method, instance, *args, **kwargs): 376 """Execute the target facade test_method(). Retry if failing the first time. 377 378 A test_method is something like self.test_xxxx() in BluetoothAdapterTests, 379 e.g., BluetoothAdapterTests.test_bluetoothd_running(). 380 381 @param test_method: the test method to retry 382 383 @returns: True if the return value of test_method() is successful. 384 False otherwise. 385 386 """ 387 if _is_successful(_run_method(test_method, test_method.__name__, 388 instance, *args, **kwargs)): 389 return True 390 391 # Try to fix the serial device if applicable. 392 logging.error('%s failed at the 1st time: (%s)', test_method.__name__, 393 str(instance.results)) 394 395 # If this test does not use any attached serial device, just re-run 396 # the test. 397 logging.info('%s: retry the 2nd time.', test_method.__name__) 398 time.sleep(1) 399 400 401 if not hasattr(instance, 'use_chameleon'): 402 return _is_successful(_run_method(test_method, test_method.__name__, 403 instance, *args, **kwargs)) 404 for device_type in SUPPORTED_DEVICE_TYPES: 405 for device in getattr(instance, 'devices')[device_type]: 406 #fix_serial_device in 'recreate' mode doesn't require chameleon 407 #so just pass None for convinent. 408 if not fix_serial_device(None, device, "recreate"): 409 return False 410 411 logging.info('%s: retry the 2nd time.', test_method.__name__) 412 return _is_successful(_run_method(test_method, test_method.__name__, 413 instance, *args, **kwargs)) 414 415 416def _test_retry_and_log(test_method_or_retry_flag): 417 """A decorator that logs test results, collects error messages, and retries 418 on request. 419 420 @param test_method_or_retry_flag: either the test_method or a retry_flag. 421 There are some possibilities of this argument: 422 1. the test_method to conduct and retry: should retry the test_method. 423 This occurs with 424 @_test_retry_and_log 425 2. the retry flag is True. Should retry the test_method. 426 This occurs with 427 @_test_retry_and_log(True) 428 3. the retry flag is False. Do not retry the test_method. 429 This occurs with 430 @_test_retry_and_log(False) 431 432 @returns: a wrapper of the test_method with test log. The retry mechanism 433 would depend on the retry flag. 434 435 """ 436 437 def decorator(test_method): 438 """A decorator wrapper of the decorated test_method. 439 440 @param test_method: the test method being decorated. 441 442 @returns the wrapper of the test method. 443 444 """ 445 @functools.wraps(test_method) 446 def wrapper(instance, *args, **kwargs): 447 """A wrapper of the decorated method. 448 449 @param instance: an BluetoothAdapterTests instance 450 451 @returns the result of the test method 452 453 """ 454 instance.results = None 455 if callable(test_method_or_retry_flag) or test_method_or_retry_flag: 456 test_result = retry(test_method, instance, *args, **kwargs) 457 else: 458 test_result = test_method(instance, *args, **kwargs) 459 460 if test_result: 461 logging.info('[*** passed: %s]', test_method.__name__) 462 else: 463 fail_msg = '[--- failed: %s (%s)]' % (test_method.__name__, 464 str(instance.results)) 465 logging.error(fail_msg) 466 instance.fails.append(fail_msg) 467 return test_result 468 return wrapper 469 470 if callable(test_method_or_retry_flag): 471 # If the decorator function comes with no argument like 472 # @_test_retry_and_log 473 return decorator(test_method_or_retry_flag) 474 else: 475 # If the decorator function comes with an argument like 476 # @_test_retry_and_log(False) 477 return decorator 478 479 480def test_case_log(method): 481 """A decorator for test case methods. 482 483 The main purpose of this decorator is to display the test case name 484 in the test log which looks like 485 486 <... test_case_RA3_CD_SI200_CD_PC_CD_UA3 ...> 487 488 @param method: the test case method to decorate. 489 490 @returns: a wrapper function of the decorated method. 491 492 """ 493 @functools.wraps(method) 494 def wrapper(instance, *args, **kwargs): 495 """Log the name of the wrapped method before execution""" 496 logging.info('\n<... %s ...>', method.__name__) 497 method(instance, *args, **kwargs) 498 return wrapper 499 500 501class BluetoothAdapterTests(test.test): 502 """Server side bluetooth adapter tests. 503 504 This test class tries to thoroughly verify most of the important work 505 states of a bluetooth adapter. 506 507 The various test methods are supposed to be invoked by actual autotest 508 tests such as server/cros/site_tests/bluetooth_Adapter*. 509 510 """ 511 version = 1 512 ADAPTER_ACTION_SLEEP_SECS = 1 513 ADAPTER_PAIRING_TIMEOUT_SECS = 60 514 ADAPTER_CONNECTION_TIMEOUT_SECS = 30 515 ADAPTER_DISCONNECTION_TIMEOUT_SECS = 30 516 ADAPTER_PAIRING_POLLING_SLEEP_SECS = 3 517 ADAPTER_DISCOVER_TIMEOUT_SECS = 60 # 30 seconds too short sometimes 518 ADAPTER_DISCOVER_POLLING_SLEEP_SECS = 1 519 ADAPTER_DISCOVER_NAME_TIMEOUT_SECS = 30 520 521 ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS = 10 522 ADAPTER_POLLING_DEFAULT_SLEEP_SECS = 1 523 524 HID_REPORT_SLEEP_SECS = 1 525 526 527 DEFAULT_START_DELAY_SECS = 0 528 DEFAULT_HOLD_INTERVAL_SECS = 10 529 DEFAULT_HOLD_TIMEOUT_SECS = 60 530 DEFAULT_HOLD_SLEEP_SECS = 1 531 532 # Default suspend time in seconds for suspend resume. 533 SUSPEND_TIME_SECS=10 534 535 # hci0 is the default hci device if there is no external bluetooth dongle. 536 EXPECTED_HCI = 'hci0' 537 538 CLASS_OF_SERVICE_MASK = 0xFFE000 539 CLASS_OF_DEVICE_MASK = 0x001FFF 540 541 # Constants about advertising. 542 DAFAULT_MIN_ADVERTISEMENT_INTERVAL_MS = 1280 543 DAFAULT_MAX_ADVERTISEMENT_INTERVAL_MS = 1280 544 ADVERTISING_INTERVAL_UNIT = 0.625 545 546 # Error messages about advertising dbus methods. 547 ERROR_FAILED_TO_REGISTER_ADVERTISEMENT = ( 548 'org.bluez.Error.Failed: Failed to register advertisement') 549 ERROR_INVALID_ADVERTISING_INTERVALS = ( 550 'org.bluez.Error.InvalidArguments: Invalid arguments') 551 552 # Supported profiles by chrome os. 553 SUPPORTED_UUIDS = { 554 'HSP_AG_UUID': '00001112-0000-1000-8000-00805f9b34fb', 555 'GATT_UUID': '00001801-0000-1000-8000-00805f9b34fb', 556 'A2DP_SOURCE_UUID': '0000110a-0000-1000-8000-00805f9b34fb', 557 'HFP_AG_UUID': '0000111f-0000-1000-8000-00805f9b34fb', 558 'PNP_UUID': '00001200-0000-1000-8000-00805f9b34fb', 559 'GAP_UUID': '00001800-0000-1000-8000-00805f9b34fb'} 560 561 # Board list for name/ID test check. These devices don't need to be tested 562 REFERENCE_BOARDS = ['rambi', 'nyan', 'oak', 'reef', 'yorp', 'bip'] 563 564 # Path for btmon logs 565 BTMON_DIR_LOG_PATH = '/var/log/btmon' 566 567 def group_chameleons_type(self): 568 """Group all chameleons by the type of their detected device.""" 569 570 # Use previously created chameleon_group instead of creating new 571 if len(self.chameleon_group_copy) > 0: 572 logging.info('Using previously created chameleon group') 573 for device_type in SUPPORTED_DEVICE_TYPES: 574 self.chameleon_group[device_type] = \ 575 self.chameleon_group_copy[device_type][:] 576 return 577 578 # Create new chameleon_group 579 for device_type in SUPPORTED_DEVICE_TYPES: 580 self.chameleon_group[device_type] = list() 581 # Create copy of chameleon_group 582 self.chameleon_group_copy[device_type] = list() 583 584 for idx,chameleon in enumerate(self.host.chameleon_list): 585 for device_type,gen_device_func in SUPPORTED_DEVICE_TYPES.items(): 586 try: 587 device = gen_device_func(chameleon)() 588 if device.CheckSerialConnection(): 589 self.chameleon_group[device_type].append(chameleon) 590 logging.info('%d-th chameleon find device %s', \ 591 idx, device_type) 592 # Create copy of chameleon_group 593 self.chameleon_group_copy[device_type].append(chameleon) 594 except: 595 logging.debug('Error with initializing %s on %d-th' 596 'chameleon', device_type, idx) 597 if len(self.chameleon_group[device_type]) == 0: 598 logging.error('No device is detected on %d-th chameleon', idx) 599 600 601 def wait_for_device(self, device): 602 """Waits for device to become available again 603 604 We reset raspberry pi peer between tests. This method helps us wait to 605 prevent us from trying to use the device before it comes back up again. 606 607 @param device: proxy object of peripheral device 608 """ 609 610 def is_device_ready(): 611 """Tries to use a service of the device 612 613 @returns: True if device is available to provide service 614 False otherwise 615 """ 616 617 try: 618 # Call a simple (fast) function to determine if device is online 619 # and reachable. If we can query this property, we know the 620 # device is available for us to use 621 getattr(device, 'GetCapabilities')() 622 623 except Exception as e: 624 return False 625 626 return True 627 628 629 try: 630 utils.poll_for_condition(condition=is_device_ready, 631 desc='wait_for_device') 632 633 except utils.TimeoutError as e: 634 raise error.TestError('Peer is not available after waiting') 635 636 637 def clear_raspi_device(self, device): 638 """Clears a device on a raspi chameleon by resetting bluetooth stack 639 640 @param device: proxy object of peripheral device 641 """ 642 643 try: 644 device.ResetStack() 645 646 except SocketError as e: 647 # Ignore conn reset, expected during stack reset 648 if e.errno != errno.ECONNRESET: 649 raise 650 651 except chameleon.ChameleonConnectionError as e: 652 # Ignore chameleon conn reset, expected during stack reset 653 if str(errno.ECONNRESET) not in str(e): 654 raise 655 656 except httplib.BadStatusLine as e: 657 # BadStatusLine occurs occasionally when chameleon 658 # is restarted. We ignore it here 659 logging.error('Ignoring badstatusline exception') 660 pass 661 662 # Catch generic Fault exception by rpc server, ignore 663 # method not available as it indicates platform didn't 664 # support method and that's ok 665 except Exception, e: 666 if not (e.__class__.__name__ == 'Fault' and 667 'is not supported' in str(e)): 668 raise 669 670 # Ensure device is back online before continuing 671 self.wait_for_device(device) 672 673 674 def get_device_rasp(self, device_num, on_start=True): 675 """Get all bluetooth device objects from chameleons. 676 This method should be called only after group_chameleons_type 677 @param device_num : dict of {device_type:number}, to specify the number 678 of device needed for each device_type. 679 680 @param on_start: boolean describing whether the requested clear is for a 681 new test, or in the middle of a current one 682 683 @returns: True if Success. 684 """ 685 686 for device_type, number in device_num.items(): 687 if len(self.chameleon_group[device_type]) < number: 688 logging.error('Number of chameleon with device type' 689 '%s is %d, which is less then needed %d', device_type, 690 len(self.chameleon_group[device_type]), number) 691 return False 692 693 for chameleon in self.chameleon_group[device_type][:number]: 694 device = get_bluetooth_emulated_device(chameleon, device_type) 695 696 # Re-fresh device to clean state if test is starting 697 if on_start: 698 self.clear_raspi_device(device) 699 700 try: 701 # Tell generic chameleon to bind to this device type 702 device.SpecifyDeviceType(device_type) 703 704 # Catch generic Fault exception by rpc server, ignore method not 705 # available as it indicates platform didn't support method and 706 # that's ok 707 except Exception, e: 708 if not (e.__class__.__name__ == 'Fault' and 709 'is not supported' in str(e)): 710 raise 711 712 self.devices[device_type].append(device) 713 714 # Remove this chameleon from chameleon_group since it is already 715 # configured as a specific device 716 for temp_device in SUPPORTED_DEVICE_TYPES: 717 if chameleon in self.chameleon_group[temp_device]: 718 self.chameleon_group[temp_device].remove(chameleon) 719 720 return True 721 722 723 def get_device(self, device_type, on_start=True): 724 """Get the bluetooth device object. 725 726 @param device_type : the bluetooth device type, e.g., 'MOUSE' 727 728 @param on_start: boolean describing whether the requested clear is for a 729 new test, or in the middle of a current one 730 731 @returns: the bluetooth device object 732 733 """ 734 self.devices[device_type].append(get_bluetooth_emulated_device(\ 735 self.host.chameleon, device_type)) 736 737 # Re-fresh device to clean state if test is starting 738 if on_start: 739 self.clear_raspi_device(self.devices[device_type][-1]) 740 741 try: 742 # Tell generic chameleon to bind to this device type 743 self.devices[device_type][-1].SpecifyDeviceType(device_type) 744 745 # Catch generic Fault exception by rpc server, ignore method not 746 # available as it indicates platform didn't support method and that's 747 # ok 748 except Exception, e: 749 if not (e.__class__.__name__ == 'Fault' and 750 'is not supported' in str(e)): 751 raise 752 753 return self.devices[device_type][-1] 754 755 756 def is_device_available(self, chameleon, device_type): 757 """Determines if the named device is available on the linked chameleon 758 759 @param device_type: the bluetooth HID device type, e.g., 'MOUSE' 760 761 @returns: True if it is able to resolve the device, false otherwise 762 """ 763 764 device = SUPPORTED_DEVICE_TYPES[device_type](chameleon)() 765 try: 766 # The proxy prevents us from checking if the object is None directly 767 # so instead we call a fast method that any peripheral must support. 768 # This will fail if the object over the proxy doesn't exist 769 getattr(device, 'GetCapabilities')() 770 771 except Exception as e: 772 return False 773 774 return True 775 776 777 def list_devices_available(self): 778 """Queries which devices are available on chameleon/s 779 780 @returns: dict mapping HID device types to number of supporting peers 781 available, e.g. {'MOUSE':1, 'KEYBOARD':1} 782 """ 783 devices_available = {} 784 for device_type in SUPPORTED_DEVICE_TYPES: 785 for chameleon in self.host.chameleon_list: 786 if self.is_device_available(chameleon, device_type): 787 devices_available[device_type] = \ 788 devices_available.get(device_type, 0) + 1 789 790 return devices_available 791 792 793 def suspend_resume(self, suspend_time=SUSPEND_TIME_SECS): 794 """Suspend the DUT for a while and then resume. 795 796 @param suspend_time: the suspend time in secs 797 798 """ 799 logging.info('The DUT suspends for %d seconds...', suspend_time) 800 try: 801 self.host.suspend(suspend_time=suspend_time) 802 except error.AutoservSuspendError: 803 logging.error('The DUT did not suspend for %d seconds', suspend_time) 804 pass 805 logging.info('The DUT is waken up.') 806 807 808 def reboot(self): 809 """Reboot the DUT and recreate necessary processes and variables""" 810 self.host.reboot() 811 812 # We need to recreate the bluetooth_facade after a reboot. 813 # Delete the proxy first so it won't delete the old one, which 814 # invokes disconnection, after creating the new one. 815 del self.factory 816 del self.bluetooth_facade 817 del self.input_facade 818 self.factory = remote_facade_factory.RemoteFacadeFactory(self.host, 819 disable_arc=True) 820 self.bluetooth_facade = self.factory.create_bluetooth_hid_facade() 821 self.input_facade = self.factory.create_input_facade() 822 823 # Re-enable debugging verbose since Chrome will set it to 824 # default(disable). 825 self.enable_disable_debug_log(enable=True) 826 827 self.start_new_btmon() 828 829 830 def _wait_till_condition_holds(self, func, method_name, 831 timeout=DEFAULT_HOLD_TIMEOUT_SECS, 832 sleep_interval=DEFAULT_HOLD_SLEEP_SECS, 833 hold_interval=DEFAULT_HOLD_INTERVAL_SECS, 834 start_delay=DEFAULT_START_DELAY_SECS): 835 """ Wait for the func() to hold true for a period of time 836 837 838 @param func: the function to wait for. 839 @param method_name: the invoking class method. 840 @param timeout: number of seconds to wait before giving up. 841 @param sleep_interval: the interval in seconds to sleep between 842 invoking func(). 843 @param hold_interval: the interval in seconds for the condition to 844 remain true 845 @param start_delay: interval in seconds to wait before starting 846 847 @returns: True if the condition is met, 848 False otherwise 849 850 """ 851 if start_delay > 0: 852 logging.debug('waiting for %s secs before checking %s',start_delay, 853 method_name) 854 time.sleep(start_delay) 855 856 try: 857 utils.poll_till_condition_holds(condition=func, 858 timeout=timeout, 859 sleep_interval=sleep_interval, 860 hold_interval = hold_interval, 861 desc=('Waiting %s' % method_name)) 862 return True 863 except utils.TimeoutError as e: 864 logging.error('%s: %s', method_name, e) 865 except Exception as e: 866 logging.error('%s: %s', method_name, e) 867 err = 'bluetoothd possibly crashed. Check out /var/log/messages.' 868 logging.error(err) 869 except: 870 logging.error('%s: unexpected error', method_name) 871 return False 872 873 874 def _wait_for_condition(self, func, method_name, 875 timeout=ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS, 876 sleep_interval=ADAPTER_POLLING_DEFAULT_SLEEP_SECS, 877 start_delay=DEFAULT_START_DELAY_SECS): 878 """Wait for the func() to become True. 879 880 @param func: the function to wait for. 881 @param method_name: the invoking class method. 882 @param timeout: number of seconds to wait before giving up. 883 @param sleep_interval: the interval in seconds to sleep between 884 invoking func(). 885 @param start_delay: interval in seconds to wait before starting 886 887 @returns: True if the condition is met, 888 False otherwise 889 890 """ 891 892 if start_delay > 0: 893 logging.debug('waiting for %s secs before checking %s',start_delay, 894 method_name) 895 time.sleep(start_delay) 896 897 try: 898 utils.poll_for_condition(condition=func, 899 timeout=timeout, 900 sleep_interval=sleep_interval, 901 desc=('Waiting %s' % method_name)) 902 return True 903 except utils.TimeoutError as e: 904 logging.error('%s: %s', method_name, e) 905 except Exception as e: 906 logging.error('%s: %s', method_name, e) 907 err = 'bluetoothd possibly crashed. Check out /var/log/messages.' 908 logging.error(err) 909 except: 910 logging.error('%s: unexpected error', method_name) 911 return False 912 913 def ignore_failure(instance, test_method, *args, **kwargs): 914 """ Wrapper to prevent a test_method failure from failing the test batch 915 916 Sometimes a test method needs to be used as a normal function, for its 917 result. This wrapper prevent test_method failure being recorded in 918 instance.fails and causing a failure of the quick test batch. 919 920 @param test_method: test_method 921 @returns: result of the test_method 922 """ 923 924 original_fails = instance.fails[:] 925 test_result = test_method(*args, **kwargs) 926 if not test_result: 927 logging.info("%s failure is ignored",test_method.__name__) 928 instance.fails = original_fails 929 return test_result 930 931 # ------------------------------------------------------------------- 932 # Adater standalone tests 933 # ------------------------------------------------------------------- 934 935 936 def enable_disable_debug_log(self, enable): 937 """Enable or disable debug log in DUT 938 @param enable: True to enable all of the debug log, 939 False to disable all of the debug log. 940 """ 941 level = int(enable) 942 self.bluetooth_facade.set_debug_log_levels(level, level, level, level) 943 944 945 def start_new_btmon(self): 946 """ Start a new btmon process and save the log """ 947 948 # Kill all btmon process before creating a new one 949 self.host.run('pkill btmon || true') 950 951 # Make sure the directory exists 952 self.host.run('mkdir -p %s' % self.BTMON_DIR_LOG_PATH) 953 954 # Time format. Ex, 2020_02_20_17_52_45 955 now = time.strftime("%Y_%m_%d_%H_%M_%S") 956 file_name = 'btsnoop_%s' % now 957 self.host.run_background('btmon -SAw %s/%s' % (self.BTMON_DIR_LOG_PATH, 958 file_name)) 959 960 961 def log_message(self, msg): 962 """ Write a string to log.""" 963 self.bluetooth_facade.log_message(msg) 964 965 966 @_test_retry_and_log 967 def test_bluetoothd_running(self): 968 """Test that bluetoothd is running.""" 969 return self.bluetooth_facade.is_bluetoothd_running() 970 971 972 @_test_retry_and_log 973 def test_start_bluetoothd(self): 974 """Test that bluetoothd could be started successfully.""" 975 return self.bluetooth_facade.start_bluetoothd() 976 977 978 @_test_retry_and_log 979 def test_stop_bluetoothd(self): 980 """Test that bluetoothd could be stopped successfully.""" 981 return self.bluetooth_facade.stop_bluetoothd() 982 983 984 @_test_retry_and_log 985 def test_has_adapter(self): 986 """Verify that there is an adapter. This will return True only if both 987 the kernel and bluetooth daemon see the adapter. 988 """ 989 return self.bluetooth_facade.has_adapter() 990 991 @_test_retry_and_log 992 def test_adapter_work_state(self): 993 """Test that the bluetooth adapter is in the correct working state. 994 995 This includes that the adapter is detectable, is powered on, 996 and its hci device is hci0. 997 """ 998 has_adapter = self.bluetooth_facade.has_adapter() 999 is_powered_on = self._wait_for_condition( 1000 self.bluetooth_facade.is_powered_on, method_name()) 1001 hci = self.bluetooth_facade.get_hci() == self.EXPECTED_HCI 1002 self.results = { 1003 'has_adapter': has_adapter, 1004 'is_powered_on': is_powered_on, 1005 'hci': hci} 1006 return all(self.results.values()) 1007 1008 1009 @_test_retry_and_log 1010 def test_power_on_adapter(self): 1011 """Test that the adapter could be powered on successfully.""" 1012 power_on = self.bluetooth_facade.set_powered(True) 1013 is_powered_on = self._wait_for_condition( 1014 self.bluetooth_facade.is_powered_on, method_name()) 1015 1016 self.results = {'power_on': power_on, 'is_powered_on': is_powered_on} 1017 return all(self.results.values()) 1018 1019 1020 @_test_retry_and_log 1021 def test_power_off_adapter(self): 1022 """Test that the adapter could be powered off successfully.""" 1023 power_off = self.bluetooth_facade.set_powered(False) 1024 is_powered_off = self._wait_for_condition( 1025 lambda: not self.bluetooth_facade.is_powered_on(), 1026 method_name()) 1027 1028 self.results = { 1029 'power_off': power_off, 1030 'is_powered_off': is_powered_off} 1031 return all(self.results.values()) 1032 1033 1034 @_test_retry_and_log 1035 def test_reset_on_adapter(self): 1036 """Test that the adapter could be reset on successfully. 1037 1038 This includes restarting bluetoothd, and removing the settings 1039 and cached devices. 1040 """ 1041 reset_on = self.bluetooth_facade.reset_on() 1042 is_powered_on = self._wait_for_condition( 1043 self.bluetooth_facade.is_powered_on, method_name()) 1044 1045 self.results = {'reset_on': reset_on, 'is_powered_on': is_powered_on} 1046 return all(self.results.values()) 1047 1048 1049 @_test_retry_and_log 1050 def test_reset_off_adapter(self): 1051 """Test that the adapter could be reset off successfully. 1052 1053 This includes restarting bluetoothd, and removing the settings 1054 and cached devices. 1055 """ 1056 reset_off = self.bluetooth_facade.reset_off() 1057 is_powered_off = self._wait_for_condition( 1058 lambda: not self.bluetooth_facade.is_powered_on(), 1059 method_name()) 1060 1061 self.results = { 1062 'reset_off': reset_off, 1063 'is_powered_off': is_powered_off} 1064 return all(self.results.values()) 1065 1066 1067 @_test_retry_and_log 1068 def test_UUIDs(self): 1069 """Test that basic profiles are supported.""" 1070 adapter_UUIDs = self.bluetooth_facade.get_UUIDs() 1071 self.results = [uuid for uuid in self.SUPPORTED_UUIDS.values() 1072 if uuid not in adapter_UUIDs] 1073 return not bool(self.results) 1074 1075 1076 @_test_retry_and_log 1077 def test_start_discovery(self): 1078 """Test that the adapter could start discovery.""" 1079 start_discovery, _ = self.bluetooth_facade.start_discovery() 1080 is_discovering = self._wait_for_condition( 1081 self.bluetooth_facade.is_discovering, method_name()) 1082 1083 self.results = { 1084 'start_discovery': start_discovery, 1085 'is_discovering': is_discovering} 1086 return all(self.results.values()) 1087 1088 @_test_retry_and_log(False) 1089 def test_is_discovering(self): 1090 """Test that the adapter is already discovering.""" 1091 is_discovering = self._wait_for_condition( 1092 self.bluetooth_facade.is_discovering, method_name()) 1093 1094 self.results = {'is_discovering': is_discovering} 1095 return all(self.results.values()) 1096 1097 @_test_retry_and_log 1098 def test_stop_discovery(self): 1099 """Test that the adapter could stop discovery.""" 1100 stop_discovery, _ = self.bluetooth_facade.stop_discovery() 1101 is_not_discovering = self._wait_for_condition( 1102 lambda: not self.bluetooth_facade.is_discovering(), 1103 method_name()) 1104 1105 self.results = { 1106 'stop_discovery': stop_discovery, 1107 'is_not_discovering': is_not_discovering} 1108 return all(self.results.values()) 1109 1110 1111 @_test_retry_and_log 1112 def test_discoverable(self): 1113 """Test that the adapter could be set discoverable.""" 1114 set_discoverable = self.bluetooth_facade.set_discoverable(True) 1115 is_discoverable = self._wait_for_condition( 1116 self.bluetooth_facade.is_discoverable, method_name()) 1117 1118 self.results = { 1119 'set_discoverable': set_discoverable, 1120 'is_discoverable': is_discoverable} 1121 return all(self.results.values()) 1122 1123 @_test_retry_and_log(False) 1124 def test_is_discoverable(self): 1125 """Test that the adapter is discoverable.""" 1126 is_discoverable = self._wait_for_condition( 1127 self.bluetooth_facade.is_discoverable, method_name()) 1128 1129 self.results = {'is_discoverable': is_discoverable} 1130 return all(self.results.values()) 1131 1132 1133 def _test_timeout_property(self, set_property, check_property, set_timeout, 1134 get_timeout, property_name, 1135 timeout_values = [0, 60, 180]): 1136 """Common method to test (Discoverable/Pairable)Timeout property. 1137 1138 This is used to test 1139 - DiscoverableTimeout property 1140 - PairableTimeout property 1141 1142 The test performs the following 1143 - Set PropertyTimeout 1144 - Read PropertyTimeout and make sure values match 1145 - Set adapter propety 1146 - In a loop check if property is active 1147 - Test fails property is false before timeout 1148 - Test fails property is True after timeout 1149 Repeat the test for different values for timeout 1150 1151 Note : Value of 0 mean it never timeouts, so the test will 1152 end after 30 seconds. 1153 """ 1154 def check_timeout(timeout): 1155 """Check for timeout value in loop while recording failures.""" 1156 actual_timeout = get_timeout() 1157 if timeout != actual_timeout: 1158 logging.debug('%s timeout value read %s does not ' 1159 'match value set %s, yet', property_name, 1160 actual_timeout, timeout) 1161 return False 1162 else: 1163 return True 1164 1165 def _test_timeout_property(timeout): 1166 # minium time after timeout before checking property 1167 MIN_DELTA_SECS = 3 1168 # Time between checking property 1169 WAIT_TIME_SECS = 2 1170 1171 # Set and read back the timeout value 1172 if not set_timeout(timeout): 1173 logging.error('Setting the %s timeout failed',property_name) 1174 return False 1175 1176 1177 if not self._wait_for_condition(lambda : check_timeout(timeout), 1178 'check_'+property_name): 1179 logging.error('checking %s_timeout value timed out', 1180 property_name) 1181 return False 1182 1183 # 1184 # Check that the timeout works 1185 # Check property is true until timeout 1186 # and then it is not 1187 1188 property_set = set_property(True) 1189 property_is_true = self._wait_for_condition(check_property, 1190 method_name()) 1191 1192 self.results = { 'set_%s' % property_name : property_set, 1193 'is_%s' % property_name: property_is_true} 1194 logging.debug(self.results) 1195 1196 if not all(self.results.values()): 1197 logging.error('Setting %s failed',property_name) 1198 return False 1199 1200 start_time = time.time() 1201 while True: 1202 time.sleep(WAIT_TIME_SECS) 1203 cur_time = time.time() 1204 property_set = check_property() 1205 time_elapsed = cur_time - start_time 1206 1207 # Ignore check_property results made near the timeout 1208 # to avoid spurious failures. 1209 if abs(int(timeout - time_elapsed)) < MIN_DELTA_SECS: 1210 continue 1211 1212 # Timeout of zero seconds mean that the adapter never times out 1213 # Check for 30 seconds and then exit the test. 1214 if timeout == 0: 1215 if not property_set: 1216 logging.error('Adapter is not %s after %.2f ' 1217 'secs with a timeout of zero ', 1218 property_name, time_elapsed) 1219 return False 1220 elif time_elapsed > 30: 1221 logging.debug('Adapter %s after %.2f seconds ' 1222 'with timeout of zero as expected' , 1223 property_name, time_elapsed) 1224 return True 1225 continue 1226 1227 # 1228 # Check if property is true till timeout ends and 1229 # false afterwards 1230 # 1231 if time_elapsed < timeout: 1232 if not property_set: 1233 logging.error('Adapter is not %s after %.2f ' 1234 'secs before timeout of %.2f', 1235 property_name, time_elapsed, timeout) 1236 return False 1237 else: 1238 if property_set: 1239 logging.error('Adapter is still %s after ' 1240 ' %.2f secs with timeout of %.2f', 1241 property_name, time_elapsed, timeout) 1242 return False 1243 else: 1244 logging.debug('Adapter not %s after %.2f ' 1245 'secs with timeout of %.2f as expected ', 1246 property_name, time_elapsed, timeout) 1247 return True 1248 1249 default_value = check_property() 1250 default_timeout = get_timeout() 1251 1252 result = [] 1253 try: 1254 for timeout in timeout_values: 1255 result.append(_test_timeout_property(timeout)) 1256 logging.debug("Test returning %s", all(result)) 1257 return all(result) 1258 except: 1259 logging.error("exception in test_%s_timeout",property_name) 1260 raise 1261 finally: 1262 # Set the property back to default value permanently before 1263 # exiting the test 1264 set_timeout(0) 1265 set_property(default_value) 1266 # Set the timeout back to default value before exiting the test 1267 set_timeout(default_timeout) 1268 1269 1270 @_test_retry_and_log 1271 def test_discoverable_timeout(self, timeout_values = [0, 60, 180]): 1272 """Test adapter dbus property DiscoverableTimeout.""" 1273 return self._test_timeout_property( 1274 set_property = self.bluetooth_facade.set_discoverable, 1275 check_property = self.bluetooth_facade.is_discoverable, 1276 set_timeout = self.bluetooth_facade.set_discoverable_timeout, 1277 get_timeout = self.bluetooth_facade.get_discoverable_timeout, 1278 property_name = 'discoverable', 1279 timeout_values = timeout_values) 1280 1281 @_test_retry_and_log 1282 def test_pairable_timeout(self, timeout_values = [0, 60, 180]): 1283 """Test adapter dbus property PairableTimeout.""" 1284 return self._test_timeout_property( 1285 set_property = self.bluetooth_facade.set_pairable, 1286 check_property = self.bluetooth_facade.is_pairable, 1287 set_timeout = self.bluetooth_facade.set_pairable_timeout, 1288 get_timeout = self.bluetooth_facade.get_pairable_timeout, 1289 property_name = 'pairable', 1290 timeout_values = timeout_values) 1291 1292 1293 @_test_retry_and_log 1294 def test_nondiscoverable(self): 1295 """Test that the adapter could be set non-discoverable.""" 1296 set_nondiscoverable = self.bluetooth_facade.set_discoverable(False) 1297 is_nondiscoverable = self._wait_for_condition( 1298 lambda: not self.bluetooth_facade.is_discoverable(), 1299 method_name()) 1300 1301 self.results = { 1302 'set_nondiscoverable': set_nondiscoverable, 1303 'is_nondiscoverable': is_nondiscoverable} 1304 return all(self.results.values()) 1305 1306 1307 @_test_retry_and_log 1308 def test_pairable(self): 1309 """Test that the adapter could be set pairable.""" 1310 set_pairable = self.bluetooth_facade.set_pairable(True) 1311 is_pairable = self._wait_for_condition( 1312 self.bluetooth_facade.is_pairable, method_name()) 1313 1314 self.results = { 1315 'set_pairable': set_pairable, 1316 'is_pairable': is_pairable} 1317 return all(self.results.values()) 1318 1319 1320 @_test_retry_and_log 1321 def test_nonpairable(self): 1322 """Test that the adapter could be set non-pairable.""" 1323 set_nonpairable = self.bluetooth_facade.set_pairable(False) 1324 is_nonpairable = self._wait_for_condition( 1325 lambda: not self.bluetooth_facade.is_pairable(), method_name()) 1326 1327 self.results = { 1328 'set_nonpairable': set_nonpairable, 1329 'is_nonpairable': is_nonpairable} 1330 return all(self.results.values()) 1331 1332 1333 @_test_retry_and_log(False) 1334 def test_check_valid_adapter_id(self): 1335 """Fail if the Bluetooth ID is not in the correct format. 1336 1337 @returns True if adapter ID follows expected format, False otherwise 1338 """ 1339 1340 # Boards which only support bluetooth version 3 and below 1341 BLUETOOTH_3_BOARDS = ['x86-mario', 'x86-zgb'] 1342 1343 device = self.host.get_platform() 1344 adapter_info = self.get_adapter_properties() 1345 1346 # Don't complete test if this is a reference board 1347 if device in self.REFERENCE_BOARDS: 1348 return True 1349 1350 modalias = adapter_info['Modalias'] 1351 logging.debug('Saw Bluetooth ID of: %s', modalias) 1352 1353 if device in BLUETOOTH_3_BOARDS: 1354 bt_format = 'bluetooth:v00E0p24..d0300' 1355 else: 1356 bt_format = 'bluetooth:v00E0p24..d0400' 1357 1358 if not re.match(bt_format, modalias): 1359 return False 1360 1361 return True 1362 1363 1364 @_test_retry_and_log(False) 1365 def test_check_valid_alias(self): 1366 """Fail if the Bluetooth alias is not in the correct format. 1367 1368 @returns True if adapter alias follows expected format, False otherwise 1369 """ 1370 1371 device = self.host.get_platform() 1372 adapter_info = self.get_adapter_properties() 1373 1374 # Don't complete test if this is a reference board 1375 if device in self.REFERENCE_BOARDS: 1376 return True 1377 1378 alias = adapter_info['Alias'] 1379 logging.debug('Saw Bluetooth Alias of: %s', alias) 1380 1381 device_type = self.host.get_board_type().lower() 1382 alias_format = '%s_[a-z0-9]{4}' % device_type 1383 if not re.match(alias_format, alias.lower()): 1384 return False 1385 1386 return True 1387 1388 1389 # ------------------------------------------------------------------- 1390 # Tests about general discovering, pairing, and connection 1391 # ------------------------------------------------------------------- 1392 1393 1394 @_test_retry_and_log(False) 1395 def test_discover_device(self, device_address): 1396 """Test that the adapter could discover the specified device address. 1397 1398 @param device_address: Address of the device. 1399 1400 @returns: True if the device is found. False otherwise. 1401 1402 """ 1403 has_device_initially = False 1404 start_discovery = False 1405 device_discovered = False 1406 has_device = self.bluetooth_facade.has_device 1407 1408 if has_device(device_address): 1409 has_device_initially = True 1410 else: 1411 start_discovery, _ = self.bluetooth_facade.start_discovery() 1412 if start_discovery: 1413 try: 1414 utils.poll_for_condition( 1415 condition=(lambda: has_device(device_address)), 1416 timeout=self.ADAPTER_DISCOVER_TIMEOUT_SECS, 1417 sleep_interval= 1418 self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS, 1419 desc='Waiting for discovering %s' % device_address) 1420 device_discovered = True 1421 except utils.TimeoutError as e: 1422 logging.error('test_discover_device: %s', e) 1423 except Exception as e: 1424 logging.error('test_discover_device: %s', e) 1425 err = ('bluetoothd probably crashed.' 1426 'Check out /var/log/messages') 1427 logging.error(err) 1428 except: 1429 logging.error('test_discover_device: unexpected error') 1430 1431 self.results = { 1432 'has_device_initially': has_device_initially, 1433 'start_discovery': start_discovery, 1434 'device_discovered': device_discovered} 1435 return has_device_initially or device_discovered 1436 1437 def _test_discover_by_device(self, device): 1438 device_discovered = device.Discover(self.bluetooth_facade.address) 1439 1440 self.results = { 1441 'device_discovered': device_discovered 1442 } 1443 1444 return all(self.results.values()) 1445 1446 @_test_retry_and_log(False) 1447 def test_discover_by_device(self, device): 1448 """Test that the device could discover the adapter address. 1449 1450 @param device: Meta device to represent peer device. 1451 1452 @returns: True if the adapter is found by the device. 1453 """ 1454 return self._test_discover_by_device(device) 1455 1456 @_test_retry_and_log(False) 1457 def test_discover_by_device_fails(self, device): 1458 """Test that the device could not discover the adapter address. 1459 1460 @param device: Meta device to represent peer device. 1461 1462 @returns False if the adapter is found by the device. 1463 """ 1464 return not self._test_discover_by_device(device) 1465 1466 @_test_retry_and_log(False) 1467 def test_device_set_discoverable(self, device, discoverable): 1468 """Test that we could set the peer device to discoverable. """ 1469 try: 1470 device.SetDiscoverable(discoverable) 1471 except: 1472 return False 1473 1474 return True 1475 1476 @_test_retry_and_log 1477 def test_pairing(self, device_address, pin, trusted=True): 1478 """Test that the adapter could pair with the device successfully. 1479 1480 @param device_address: Address of the device. 1481 @param pin: pin code to pair with the device. 1482 @param trusted: indicating whether to set the device trusted. 1483 1484 @returns: True if pairing succeeds. False otherwise. 1485 1486 """ 1487 1488 def _pair_device(): 1489 """Pair to the device. 1490 1491 @returns: True if it could pair with the device. False otherwise. 1492 1493 """ 1494 return self.bluetooth_facade.pair_legacy_device( 1495 device_address, pin, trusted, 1496 self.ADAPTER_PAIRING_TIMEOUT_SECS) 1497 1498 1499 def _verify_connection_info(): 1500 """Verify that connection info to device is retrievable. 1501 1502 @returns: True if the connection info is retrievable. 1503 False otherwise. 1504 """ 1505 return (self.bluetooth_facade.get_connection_info(device_address) 1506 is not None) 1507 1508 1509 has_device = False 1510 paired = False 1511 connection_info_retrievable = False 1512 if self.bluetooth_facade.has_device(device_address): 1513 has_device = True 1514 try: 1515 utils.poll_for_condition( 1516 condition=_pair_device, 1517 timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS, 1518 sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, 1519 desc='Waiting for pairing %s' % device_address) 1520 paired = True 1521 except utils.TimeoutError as e: 1522 logging.error('test_pairing: %s', e) 1523 except: 1524 logging.error('test_pairing: unexpected error') 1525 1526 connection_info_retrievable = _verify_connection_info() 1527 1528 self.results = { 1529 'has_device': has_device, 1530 'paired': paired, 1531 'connection_info_retrievable': connection_info_retrievable} 1532 return all(self.results.values()) 1533 1534 1535 @_test_retry_and_log 1536 def test_remove_pairing(self, device_address): 1537 """Test that the adapter could remove the paired device. 1538 1539 @param device_address: Address of the device. 1540 1541 @returns: True if the device is removed successfully. False otherwise. 1542 1543 """ 1544 device_is_paired_initially = self.bluetooth_facade.device_is_paired( 1545 device_address) 1546 remove_pairing = False 1547 pairing_removed = False 1548 1549 if device_is_paired_initially: 1550 remove_pairing = self.bluetooth_facade.remove_device_object( 1551 device_address) 1552 pairing_removed = not self.bluetooth_facade.device_is_paired( 1553 device_address) 1554 1555 self.results = { 1556 'device_is_paired_initially': device_is_paired_initially, 1557 'remove_pairing': remove_pairing, 1558 'pairing_removed': pairing_removed} 1559 return all(self.results.values()) 1560 1561 1562 def test_set_trusted(self, device_address, trusted=True): 1563 """Test whether the device with the specified address is trusted. 1564 1565 @param device_address: Address of the device. 1566 @param trusted : True or False indicating if trusted is expected. 1567 1568 @returns: True if the device's "Trusted" property is as specified; 1569 False otherwise. 1570 1571 """ 1572 1573 set_trusted = self.bluetooth_facade.set_trusted( 1574 device_address, trusted) 1575 1576 actual_trusted = self.bluetooth_facade.get_device_property( 1577 device_address, 'Trusted') 1578 1579 self.results = { 1580 'set_trusted': set_trusted, 1581 'actual trusted': actual_trusted, 1582 'expected trusted': trusted} 1583 return actual_trusted == trusted 1584 1585 1586 @_test_retry_and_log 1587 def test_connection_by_adapter(self, device_address): 1588 """Test that the adapter of dut could connect to the device successfully 1589 1590 It is the caller's responsibility to pair to the device before 1591 doing connection. 1592 1593 @param device_address: Address of the device. 1594 1595 @returns: True if connection is performed. False otherwise. 1596 1597 """ 1598 1599 def _connect_device(): 1600 """Connect to the device. 1601 1602 @returns: True if it could connect to the device. False otherwise. 1603 1604 """ 1605 return self.bluetooth_facade.connect_device(device_address) 1606 1607 1608 has_device = False 1609 connected = False 1610 if self.bluetooth_facade.has_device(device_address): 1611 has_device = True 1612 try: 1613 utils.poll_for_condition( 1614 condition=_connect_device, 1615 timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS, 1616 sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, 1617 desc='Waiting for connecting to %s' % device_address) 1618 connected = True 1619 except utils.TimeoutError as e: 1620 logging.error('test_connection_by_adapter: %s', e) 1621 except: 1622 logging.error('test_connection_by_adapter: unexpected error') 1623 1624 self.results = {'has_device': has_device, 'connected': connected} 1625 return all(self.results.values()) 1626 1627 1628 @_test_retry_and_log 1629 def test_disconnection_by_adapter(self, device_address): 1630 """Test that the adapter of dut could disconnect the device successfully 1631 1632 @param device_address: Address of the device. 1633 1634 @returns: True if disconnection is performed. False otherwise. 1635 1636 """ 1637 return self.bluetooth_facade.disconnect_device(device_address) 1638 1639 1640 def _enter_command_mode(self, device): 1641 """Let the device enter command mode. 1642 1643 Before using the device, need to call this method to make sure 1644 it is in the command mode. 1645 1646 @param device: the bluetooth HID device 1647 1648 @returns: True if successful. False otherwise. 1649 1650 """ 1651 result = _is_successful(_run_method(device.EnterCommandMode, 1652 'EnterCommandMode')) 1653 if not result: 1654 logging.error('EnterCommandMode failed') 1655 return result 1656 1657 1658 @_test_retry_and_log 1659 def test_connection_by_device(self, device): 1660 """Test that the device could connect to the adapter successfully. 1661 1662 This emulates the behavior that a device may initiate a 1663 connection request after waking up from power saving mode. 1664 1665 @param device: the bluetooth HID device 1666 1667 @returns: True if connection is performed correctly by device and 1668 the adapter also enters connection state. 1669 False otherwise. 1670 1671 """ 1672 if not self._enter_command_mode(device): 1673 return False 1674 1675 method_name = 'test_connection_by_device' 1676 connection_by_device = False 1677 adapter_address = self.bluetooth_facade.address 1678 try: 1679 device.ConnectToRemoteAddress(adapter_address) 1680 connection_by_device = True 1681 except Exception as e: 1682 logging.error('%s (device): %s', method_name, e) 1683 except: 1684 logging.error('%s (device): unexpected error', method_name) 1685 1686 connection_seen_by_adapter = False 1687 device_address = device.address 1688 device_is_connected = self.bluetooth_facade.device_is_connected 1689 try: 1690 utils.poll_for_condition( 1691 condition=lambda: device_is_connected(device_address), 1692 timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS, 1693 desc=('Waiting for connection from %s' % device_address)) 1694 connection_seen_by_adapter = True 1695 except utils.TimeoutError as e: 1696 logging.error('%s (adapter): %s', method_name, e) 1697 except: 1698 logging.error('%s (adapter): unexpected error', method_name) 1699 1700 self.results = { 1701 'connection_by_device': connection_by_device, 1702 'connection_seen_by_adapter': connection_seen_by_adapter} 1703 return all(self.results.values()) 1704 1705 @_test_retry_and_log 1706 def test_connection_by_device_only(self, device, adapter_address): 1707 """Test that the device could connect to adapter successfully. 1708 1709 This is a modified version of test_connection_by_device that only 1710 communicates with the peer device and not the host (in case the host is 1711 suspended for example). 1712 1713 @param device: the bluetooth peer device 1714 @param adapter_address: address of the adapter 1715 1716 @returns: True if the connection was established by the device or False. 1717 """ 1718 connected = device.ConnectToRemoteAddress(adapter_address) 1719 self.results = { 1720 'connection_by_device': connected 1721 } 1722 1723 return all(self.results.values()) 1724 1725 1726 @_test_retry_and_log 1727 def test_disconnection_by_device(self, device): 1728 """Test that the device could disconnect the adapter successfully. 1729 1730 This emulates the behavior that a device may initiate a 1731 disconnection request before going into power saving mode. 1732 1733 Note: should not try to enter command mode in this method. When 1734 a device is connected, there is no way to enter command mode. 1735 One could just issue a special disconnect command without 1736 entering command mode. 1737 1738 @param device: the bluetooth HID device 1739 1740 @returns: True if disconnection is performed correctly by device and 1741 the adapter also observes the disconnection. 1742 False otherwise. 1743 1744 """ 1745 method_name = 'test_disconnection_by_device' 1746 disconnection_by_device = False 1747 try: 1748 device.Disconnect() 1749 disconnection_by_device = True 1750 except Exception as e: 1751 logging.error('%s (device): %s', method_name, e) 1752 except: 1753 logging.error('%s (device): unexpected error', method_name) 1754 1755 disconnection_seen_by_adapter = False 1756 device_address = device.address 1757 device_is_connected = self.bluetooth_facade.device_is_connected 1758 try: 1759 utils.poll_for_condition( 1760 condition=lambda: not device_is_connected(device_address), 1761 timeout=self.ADAPTER_DISCONNECTION_TIMEOUT_SECS, 1762 desc=('Waiting for disconnection from %s' % device_address)) 1763 disconnection_seen_by_adapter = True 1764 except utils.TimeoutError as e: 1765 logging.error('%s (adapter): %s', method_name, e) 1766 except: 1767 logging.error('%s (adapter): unexpected error', method_name) 1768 1769 self.results = { 1770 'disconnection_by_device': disconnection_by_device, 1771 'disconnection_seen_by_adapter': disconnection_seen_by_adapter} 1772 return all(self.results.values()) 1773 1774 1775 @_test_retry_and_log(False) 1776 def test_device_is_connected(self, device_address): 1777 """Test that device address given is currently connected. 1778 1779 @param device_address: Address of the device. 1780 1781 @returns: True if the device is connected. 1782 False otherwise. 1783 """ 1784 1785 def _is_connected(): 1786 """Test if device is connected. 1787 1788 @returns: True if device is connected. False otherwise. 1789 1790 """ 1791 return self.bluetooth_facade.device_is_connected(device_address) 1792 1793 1794 method_name = 'test_device_is_connected' 1795 has_device = False 1796 connected = False 1797 if self.bluetooth_facade.has_device(device_address): 1798 has_device = True 1799 try: 1800 utils.poll_for_condition( 1801 condition=_is_connected, 1802 timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS, 1803 sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, 1804 desc='Waiting to check connection to %s' % 1805 device_address) 1806 connected = True 1807 except utils.TimeoutError as e: 1808 logging.error('%s: %s', method_name, e) 1809 except: 1810 logging.error('%s: unexpected error', method_name) 1811 self.results = {'has_device': has_device, 'connected': connected} 1812 return all(self.results.values()) 1813 1814 1815 @_test_retry_and_log(False) 1816 def test_device_is_not_connected(self, device_address): 1817 """Test that device address given is NOT currently connected. 1818 1819 @param device_address: Address of the device. 1820 1821 @returns: True if the device is NOT connected. 1822 False otherwise. 1823 1824 """ 1825 1826 def _is_not_connected(): 1827 """Test if device is not connected. 1828 1829 @returns: True if device is not connected. False otherwise. 1830 1831 """ 1832 return not self.bluetooth_facade.device_is_connected( 1833 device_address) 1834 1835 1836 method_name = 'test_device_is_not_connected' 1837 not_connected = False 1838 if self.bluetooth_facade.has_device(device_address): 1839 try: 1840 utils.poll_for_condition( 1841 condition=_is_not_connected, 1842 timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS, 1843 sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, 1844 desc='Waiting to check connection to %s' % 1845 device_address) 1846 not_connected = True 1847 except utils.TimeoutError as e: 1848 logging.error('%s: %s', method_name, e) 1849 except: 1850 logging.error('%s: unexpected error', method_name) 1851 raise 1852 else: 1853 not_connected = True 1854 self.results = {'not_connected': not_connected} 1855 return all(self.results.values()) 1856 1857 1858 @_test_retry_and_log 1859 def test_device_is_paired(self, device_address): 1860 """Test that the device address given is currently paired. 1861 1862 @param device_address: Address of the device. 1863 1864 @returns: True if the device is paired. 1865 False otherwise. 1866 1867 """ 1868 def _is_paired(): 1869 """Test if device is paired. 1870 1871 @returns: True if device is paired. False otherwise. 1872 1873 """ 1874 return self.bluetooth_facade.device_is_paired(device_address) 1875 1876 1877 method_name = 'test_device_is_paired' 1878 has_device = False 1879 paired = False 1880 if self.bluetooth_facade.has_device(device_address): 1881 has_device = True 1882 try: 1883 utils.poll_for_condition( 1884 condition=_is_paired, 1885 timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS, 1886 sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, 1887 desc='Waiting for connection to %s' % device_address) 1888 paired = True 1889 except utils.TimeoutError as e: 1890 logging.error('%s: %s', method_name, e) 1891 except: 1892 logging.error('%s: unexpected error', method_name) 1893 self.results = {'has_device': has_device, 'paired': paired} 1894 return all(self.results.values()) 1895 1896 1897 def _get_device_name(self, device_address): 1898 """Get the device name. 1899 1900 @returns: True if the device name is derived. None otherwise. 1901 1902 """ 1903 1904 self.discovered_device_name = self.bluetooth_facade.get_device_property( 1905 device_address, 'Name') 1906 1907 return bool(self.discovered_device_name) 1908 1909 1910 @_test_retry_and_log 1911 def test_device_name(self, device_address, expected_device_name): 1912 """Test that the device name discovered by the adapter is correct. 1913 1914 @param device_address: Address of the device. 1915 @param expected_device_name: the bluetooth device name 1916 1917 @returns: True if the discovered_device_name is expected_device_name. 1918 False otherwise. 1919 1920 """ 1921 try: 1922 utils.poll_for_condition( 1923 condition=lambda: self._get_device_name(device_address), 1924 timeout=self.ADAPTER_DISCOVER_NAME_TIMEOUT_SECS, 1925 sleep_interval=self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS, 1926 desc='Waiting for device name of %s' % device_address) 1927 except utils.TimeoutError as e: 1928 logging.error('test_device_name: %s', e) 1929 except: 1930 logging.error('test_device_name: unexpected error') 1931 1932 self.results = { 1933 'expected_device_name': expected_device_name, 1934 'discovered_device_name': self.discovered_device_name} 1935 return self.discovered_device_name == expected_device_name 1936 1937 1938 @_test_retry_and_log 1939 def test_device_class_of_service(self, device_address, 1940 expected_class_of_service): 1941 """Test that the discovered device class of service is as expected. 1942 1943 @param device_address: Address of the device. 1944 @param expected_class_of_service: the expected class of service 1945 1946 @returns: True if the discovered class of service matches the 1947 expected class of service. False otherwise. 1948 1949 """ 1950 1951 device_class = self.bluetooth_facade.get_device_property(device_address, 1952 'Class') 1953 discovered_class_of_service = (device_class & self.CLASS_OF_SERVICE_MASK 1954 if device_class else None) 1955 1956 self.results = { 1957 'device_class': device_class, 1958 'expected_class_of_service': expected_class_of_service, 1959 'discovered_class_of_service': discovered_class_of_service} 1960 return discovered_class_of_service == expected_class_of_service 1961 1962 1963 @_test_retry_and_log 1964 def test_device_class_of_device(self, device_address, 1965 expected_class_of_device): 1966 """Test that the discovered device class of device is as expected. 1967 1968 @param device_address: Address of the device. 1969 @param expected_class_of_device: the expected class of device 1970 1971 @returns: True if the discovered class of device matches the 1972 expected class of device. False otherwise. 1973 1974 """ 1975 1976 device_class = self.bluetooth_facade.get_device_property(device_address, 1977 'Class') 1978 discovered_class_of_device = (device_class & self.CLASS_OF_DEVICE_MASK 1979 if device_class else None) 1980 1981 self.results = { 1982 'device_class': device_class, 1983 'expected_class_of_device': expected_class_of_device, 1984 'discovered_class_of_device': discovered_class_of_device} 1985 return discovered_class_of_device == expected_class_of_device 1986 1987 1988 def _get_btmon_log(self, method, logging_timespan=1): 1989 """Capture the btmon log when executing the specified method. 1990 1991 @param method: the method to capture log. 1992 The method would be executed only when it is not None. 1993 This allows us to simply capture btmon log without 1994 executing any command. 1995 @param logging_timespan: capture btmon log for logging_timespan seconds. 1996 1997 """ 1998 self.bluetooth_le_facade.btmon_start() 1999 self.advertising_msg = method() if method else '' 2000 time.sleep(logging_timespan) 2001 self.bluetooth_le_facade.btmon_stop() 2002 2003 2004 def convert_to_adv_jiffies(self, adv_interval_ms): 2005 """Convert adv interval in ms to jiffies, i.e., multiples of 0.625 ms. 2006 2007 @param adv_interval_ms: an advertising interval 2008 2009 @returns: the equivalent jiffies 2010 2011 """ 2012 return adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT 2013 2014 2015 def compute_duration(self, max_adv_interval_ms): 2016 """Compute duration from max_adv_interval_ms. 2017 2018 Advertising duration is calculated approximately as 2019 duration = max_adv_interval_ms / 1000.0 * 1.1 2020 2021 @param max_adv_interval_ms: max advertising interval in milliseconds. 2022 2023 @returns: duration in seconds. 2024 2025 """ 2026 return max_adv_interval_ms / 1000.0 * 1.1 2027 2028 2029 def compute_logging_timespan(self, max_adv_interval_ms): 2030 """Compute the logging timespan from max_adv_interval_ms. 2031 2032 The logging timespan is the time needed to record btmon log. 2033 2034 @param max_adv_interval_ms: max advertising interval in milliseconds. 2035 2036 @returns: logging_timespan in seconds. 2037 2038 """ 2039 duration = self.compute_duration(max_adv_interval_ms) 2040 logging_timespan = max(self.count_advertisements * duration, 1) 2041 return logging_timespan 2042 2043 2044 @_test_retry_and_log(False) 2045 def test_check_duration_and_intervals(self, min_adv_interval_ms, 2046 max_adv_interval_ms, 2047 number_advertisements): 2048 """Verify that every advertisements are scheduled according to the 2049 duration and intervals. 2050 2051 An advertisement would be scheduled at the time span of 2052 duration * number_advertisements 2053 2054 @param min_adv_interval_ms: min advertising interval in milliseconds. 2055 @param max_adv_interval_ms: max advertising interval in milliseconds. 2056 @param number_advertisements: the number of existing advertisements 2057 2058 @returns: True if all advertisements are scheduled based on the 2059 duration and intervals. 2060 2061 """ 2062 2063 2064 def within_tolerance(expected, actual, max_error=0.1): 2065 """Determine if the percent error is within specified tolerance. 2066 2067 @param expected: The expected value. 2068 @param actual: The actual (measured) value. 2069 @param max_error: The maximum percent error acceptable. 2070 2071 @returns: True if the percent error is less than or equal to 2072 max_error. 2073 """ 2074 return abs(expected - actual) / abs(expected) <= max_error 2075 2076 2077 start_str = 'Set Advertising Intervals:' 2078 search_strings = ['HCI Command: LE Set Advertising Data', 'Company'] 2079 search_str = '|'.join(search_strings) 2080 2081 contents = self.bluetooth_le_facade.btmon_get(search_str=search_str, 2082 start_str=start_str) 2083 2084 # Company string looks like 2085 # Company: not assigned (65283) 2086 company_pattern = re.compile('Company:.*\((\d*)\)') 2087 2088 # The string with timestamp looks like 2089 # < HCI Command: LE Set Advertising Data (0x08|0x0008) [hci0] 3.799236 2090 set_adv_time_str = 'LE Set Advertising Data.*\[hci\d\].*(\d+\.\d+)' 2091 set_adv_time_pattern = re.compile(set_adv_time_str) 2092 2093 adv_timestamps = {} 2094 timestamp = None 2095 manufacturer_id = None 2096 for line in contents: 2097 result = set_adv_time_pattern.search(line) 2098 if result: 2099 timestamp = float(result.group(1)) 2100 2101 result = company_pattern.search(line) 2102 if result: 2103 manufacturer_id = '0x%04x' % int(result.group(1)) 2104 2105 if timestamp and manufacturer_id: 2106 if manufacturer_id not in adv_timestamps: 2107 adv_timestamps[manufacturer_id] = [] 2108 adv_timestamps[manufacturer_id].append(timestamp) 2109 timestamp = None 2110 manufacturer_id = None 2111 2112 duration = self.compute_duration(max_adv_interval_ms) 2113 expected_timespan = duration * number_advertisements 2114 2115 check_duration = True 2116 for manufacturer_id, values in adv_timestamps.iteritems(): 2117 logging.debug('manufacturer_id %s: %s', manufacturer_id, values) 2118 timespans = [values[i] - values[i - 1] 2119 for i in xrange(1, len(values))] 2120 errors = [timespans[i] for i in xrange(len(timespans)) 2121 if not within_tolerance(expected_timespan, timespans[i])] 2122 logging.debug('timespans: %s', timespans) 2123 logging.debug('errors: %s', errors) 2124 if bool(errors): 2125 check_duration = False 2126 2127 # Verify that the advertising intervals are also correct. 2128 min_adv_interval_ms_found, max_adv_interval_ms_found = ( 2129 self._verify_advertising_intervals(min_adv_interval_ms, 2130 max_adv_interval_ms)) 2131 2132 self.results = { 2133 'check_duration': check_duration, 2134 'max_adv_interval_ms_found': max_adv_interval_ms_found, 2135 'max_adv_interval_ms_found': max_adv_interval_ms_found, 2136 } 2137 return all(self.results.values()) 2138 2139 2140 def _get_min_max_intervals_strings(self, min_adv_interval_ms, 2141 max_adv_interval_ms): 2142 """Get the min and max advertising intervals strings shown in btmon. 2143 2144 Advertising intervals shown in the btmon log look like 2145 Min advertising interval: 1280.000 msec (0x0800) 2146 Max advertising interval: 1280.000 msec (0x0800) 2147 2148 @param min_adv_interval_ms: min advertising interval in milliseconds. 2149 @param max_adv_interval_ms: max advertising interval in milliseconds. 2150 2151 @returns: the min and max intervals strings. 2152 2153 """ 2154 min_str = ('Min advertising interval: %.3f msec (0x%04x)' % 2155 (min_adv_interval_ms, 2156 min_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT)) 2157 logging.debug('min_adv_interval_ms: %s', min_str) 2158 2159 max_str = ('Max advertising interval: %.3f msec (0x%04x)' % 2160 (max_adv_interval_ms, 2161 max_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT)) 2162 logging.debug('max_adv_interval_ms: %s', max_str) 2163 2164 return (min_str, max_str) 2165 2166 2167 def _verify_advertising_intervals(self, min_adv_interval_ms, 2168 max_adv_interval_ms): 2169 """Verify min and max advertising intervals. 2170 2171 Advertising intervals look like 2172 Min advertising interval: 1280.000 msec (0x0800) 2173 Max advertising interval: 1280.000 msec (0x0800) 2174 2175 @param min_adv_interval_ms: min advertising interval in milliseconds. 2176 @param max_adv_interval_ms: max advertising interval in milliseconds. 2177 2178 @returns: a tuple of (True, True) if both min and max advertising 2179 intervals could be found. Otherwise, the corresponding element 2180 in the tuple if False. 2181 2182 """ 2183 min_str, max_str = self._get_min_max_intervals_strings( 2184 min_adv_interval_ms, max_adv_interval_ms) 2185 2186 min_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(min_str) 2187 max_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(max_str) 2188 2189 return min_adv_interval_ms_found, max_adv_interval_ms_found 2190 2191 2192 @_test_retry_and_log(False) 2193 def test_register_advertisement(self, advertisement_data, instance_id, 2194 min_adv_interval_ms, max_adv_interval_ms): 2195 """Verify that an advertisement is registered correctly. 2196 2197 This test verifies the following data: 2198 - advertisement added 2199 - manufacturer data 2200 - service UUIDs 2201 - service data 2202 - advertising intervals 2203 - advertising enabled 2204 2205 @param advertisement_data: the data of an advertisement to register. 2206 @param instance_id: the instance id which starts at 1. 2207 @param min_adv_interval_ms: min_adv_interval in milliseconds. 2208 @param max_adv_interval_ms: max_adv_interval in milliseconds. 2209 2210 @returns: True if the advertisement is registered correctly. 2211 False otherwise. 2212 2213 """ 2214 # When registering a new advertisement, it is possible that another 2215 # instance is advertising. It may need to wait for all other 2216 # advertisements to complete advertising once. 2217 self.count_advertisements += 1 2218 logging_timespan = self.compute_logging_timespan(max_adv_interval_ms) 2219 self._get_btmon_log( 2220 lambda: self.bluetooth_le_facade.register_advertisement( 2221 advertisement_data), 2222 logging_timespan=logging_timespan) 2223 2224 # Verify that a new advertisement is added. 2225 advertisement_added = ( 2226 self.bluetooth_le_facade.btmon_find('Advertising Added') and 2227 self.bluetooth_le_facade.btmon_find('Instance: %d' % 2228 instance_id)) 2229 2230 # Verify that the manufacturer data could be found. 2231 manufacturer_data = advertisement_data.get('ManufacturerData', '') 2232 for manufacturer_id in manufacturer_data: 2233 # The 'not assigned' text below means the manufacturer id 2234 # is not actually assigned to any real manufacturer. 2235 # For real 16-bit manufacturer UUIDs, refer to 2236 # https://www.bluetooth.com/specifications/assigned-numbers/16-bit-UUIDs-for-Members 2237 manufacturer_data_found = self.bluetooth_le_facade.btmon_find( 2238 'Company: not assigned (%d)' % int(manufacturer_id, 16)) 2239 2240 # Verify that all service UUIDs could be found. 2241 service_uuids_found = True 2242 for uuid in advertisement_data.get('ServiceUUIDs', []): 2243 # Service UUIDs looks like ['0x180D', '0x180F'] 2244 # Heart Rate (0x180D) 2245 # Battery Service (0x180F) 2246 # For actual 16-bit service UUIDs, refer to 2247 # https://www.bluetooth.com/specifications/gatt/services 2248 if not self.bluetooth_le_facade.btmon_find('0x%s' % uuid): 2249 service_uuids_found = False 2250 break 2251 2252 # Verify service data. 2253 service_data_found = True 2254 for uuid, data in advertisement_data.get('ServiceData', {}).items(): 2255 # A service data looks like 2256 # Service Data (UUID 0x9999): 0001020304 2257 # while uuid is '9999' and data is [0x00, 0x01, 0x02, 0x03, 0x04] 2258 data_str = ''.join(map(lambda n: '%02x' % n, data)) 2259 if not self.bluetooth_le_facade.btmon_find( 2260 'Service Data (UUID 0x%s): %s' % (uuid, data_str)): 2261 service_data_found = False 2262 break 2263 2264 # Verify that the advertising intervals are correct. 2265 min_adv_interval_ms_found, max_adv_interval_ms_found = ( 2266 self._verify_advertising_intervals(min_adv_interval_ms, 2267 max_adv_interval_ms)) 2268 2269 # Verify advertising is enabled. 2270 advertising_enabled = self.bluetooth_le_facade.btmon_find( 2271 'Advertising: Enabled (0x01)') 2272 2273 self.results = { 2274 'advertisement_added': advertisement_added, 2275 'manufacturer_data_found': manufacturer_data_found, 2276 'service_uuids_found': service_uuids_found, 2277 'service_data_found': service_data_found, 2278 'min_adv_interval_ms_found': min_adv_interval_ms_found, 2279 'max_adv_interval_ms_found': max_adv_interval_ms_found, 2280 'advertising_enabled': advertising_enabled, 2281 } 2282 return all(self.results.values()) 2283 2284 2285 @_test_retry_and_log(False) 2286 def test_fail_to_register_advertisement(self, advertisement_data, 2287 min_adv_interval_ms, 2288 max_adv_interval_ms): 2289 """Verify that failure is incurred when max advertisements are reached. 2290 2291 This test verifies that a registration failure is incurred when 2292 max advertisements are reached. The error message looks like: 2293 2294 org.bluez.Error.Failed: Maximum advertisements reached 2295 2296 @param advertisement_data: the advertisement to register. 2297 @param min_adv_interval_ms: min_adv_interval in milliseconds. 2298 @param max_adv_interval_ms: max_adv_interval in milliseconds. 2299 2300 @returns: True if the error message is received correctly. 2301 False otherwise. 2302 2303 """ 2304 logging_timespan = self.compute_logging_timespan(max_adv_interval_ms) 2305 self._get_btmon_log( 2306 lambda: self.bluetooth_le_facade.register_advertisement( 2307 advertisement_data), 2308 logging_timespan=logging_timespan) 2309 2310 # Verify that it failed to register advertisement due to the fact 2311 # that max advertisements are reached. 2312 failed_to_register_error = (self.ERROR_FAILED_TO_REGISTER_ADVERTISEMENT 2313 in self.advertising_msg) 2314 2315 # Verify that no new advertisement is added. 2316 advertisement_not_added = not self.bluetooth_le_facade.btmon_find( 2317 'Advertising Added:') 2318 2319 # Verify that the advertising intervals are correct. 2320 min_adv_interval_ms_found, max_adv_interval_ms_found = ( 2321 self._verify_advertising_intervals(min_adv_interval_ms, 2322 max_adv_interval_ms)) 2323 2324 # Verify advertising remains enabled. 2325 advertising_enabled = self.bluetooth_le_facade.btmon_find( 2326 'Advertising: Enabled (0x01)') 2327 2328 self.results = { 2329 'failed_to_register_error': failed_to_register_error, 2330 'advertisement_not_added': advertisement_not_added, 2331 'min_adv_interval_ms_found': min_adv_interval_ms_found, 2332 'max_adv_interval_ms_found': max_adv_interval_ms_found, 2333 'advertising_enabled': advertising_enabled, 2334 } 2335 return all(self.results.values()) 2336 2337 2338 @_test_retry_and_log(False) 2339 def test_unregister_advertisement(self, advertisement_data, instance_id, 2340 advertising_disabled): 2341 """Verify that an advertisement is unregistered correctly. 2342 2343 This test verifies the following data: 2344 - advertisement removed 2345 - advertising status: enabled if there are advertisements left; 2346 disabled otherwise. 2347 2348 @param advertisement_data: the data of an advertisement to unregister. 2349 @param instance_id: the instance id of the advertisement to remove. 2350 @param advertising_disabled: is advertising disabled? This happens 2351 only when all advertisements are removed. 2352 2353 @returns: True if the advertisement is unregistered correctly. 2354 False otherwise. 2355 2356 """ 2357 self.count_advertisements -= 1 2358 self._get_btmon_log( 2359 lambda: self.bluetooth_le_facade.unregister_advertisement( 2360 advertisement_data)) 2361 2362 # Verify that the advertisement is removed. 2363 advertisement_removed = ( 2364 self.bluetooth_le_facade.btmon_find('Advertising Removed') and 2365 self.bluetooth_le_facade.btmon_find('Instance: %d' % 2366 instance_id)) 2367 2368 # If advertising_disabled is True, there should be no log like 2369 # 'Advertising: Enabled (0x01)' 2370 # If advertising_disabled is False, there should be log like 2371 # 'Advertising: Enabled (0x01)' 2372 2373 # Only need to check advertising status when the last advertisement 2374 # is removed. For any other advertisements prior to the last one, 2375 # we may or may not observe 'Advertising: Enabled (0x01)' message. 2376 # Hence, the test would become flaky if we insist to see that message. 2377 # A possible workaround is to sleep for a while and then check the 2378 # message. The drawback is that we may need to wait up to 10 seconds 2379 # if the advertising duration and intervals are long. 2380 # In a test case, we always run test_check_duration_and_intervals() 2381 # to check if advertising duration and intervals are correct after 2382 # un-registering one or all advertisements, it is safe to do so. 2383 advertising_enabled_found = self.bluetooth_le_facade.btmon_find( 2384 'Advertising: Enabled (0x01)') 2385 advertising_disabled_found = self.bluetooth_le_facade.btmon_find( 2386 'Advertising: Disabled (0x00)') 2387 advertising_status_correct = not advertising_disabled or ( 2388 advertising_disabled_found and not advertising_enabled_found) 2389 2390 self.results = { 2391 'advertisement_removed': advertisement_removed, 2392 'advertising_status_correct': advertising_status_correct, 2393 } 2394 return all(self.results.values()) 2395 2396 2397 @_test_retry_and_log(False) 2398 def test_set_advertising_intervals(self, min_adv_interval_ms, 2399 max_adv_interval_ms): 2400 """Verify that new advertising intervals are set correctly. 2401 2402 Note that setting advertising intervals does not enable/disable 2403 advertising. Hence, there is no need to check the advertising 2404 status. 2405 2406 @param min_adv_interval_ms: the min advertising interval in ms. 2407 @param max_adv_interval_ms: the max advertising interval in ms. 2408 2409 @returns: True if the new advertising intervals are correct. 2410 False otherwise. 2411 2412 """ 2413 self._get_btmon_log( 2414 lambda: self.bluetooth_le_facade.set_advertising_intervals( 2415 min_adv_interval_ms, max_adv_interval_ms)) 2416 2417 # Verify the new advertising intervals. 2418 # With intervals of 200 ms and 200 ms, the log looks like 2419 # bluetoothd: Set Advertising Intervals: 0x0140, 0x0140 2420 txt = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x' 2421 adv_intervals_found = self.bluetooth_le_facade.btmon_find( 2422 txt % (self.convert_to_adv_jiffies(min_adv_interval_ms), 2423 self.convert_to_adv_jiffies(max_adv_interval_ms))) 2424 2425 self.results = {'adv_intervals_found': adv_intervals_found} 2426 return all(self.results.values()) 2427 2428 2429 @_test_retry_and_log(False) 2430 def test_fail_to_set_advertising_intervals( 2431 self, invalid_min_adv_interval_ms, invalid_max_adv_interval_ms, 2432 orig_min_adv_interval_ms, orig_max_adv_interval_ms): 2433 """Verify that setting invalid advertising intervals results in error. 2434 2435 If invalid min/max advertising intervals are given, it would incur 2436 the error: 'org.bluez.Error.InvalidArguments: Invalid arguments'. 2437 Note that valid advertising intervals fall between 20 ms and 10,240 ms. 2438 2439 @param invalid_min_adv_interval_ms: the invalid min advertising interval 2440 in ms. 2441 @param invalid_max_adv_interval_ms: the invalid max advertising interval 2442 in ms. 2443 @param orig_min_adv_interval_ms: the original min advertising interval 2444 in ms. 2445 @param orig_max_adv_interval_ms: the original max advertising interval 2446 in ms. 2447 2448 @returns: True if it fails to set invalid advertising intervals. 2449 False otherwise. 2450 2451 """ 2452 self._get_btmon_log( 2453 lambda: self.bluetooth_le_facade.set_advertising_intervals( 2454 invalid_min_adv_interval_ms, 2455 invalid_max_adv_interval_ms)) 2456 2457 # Verify that the invalid error is observed in the dbus error callback 2458 # message. 2459 invalid_intervals_error = (self.ERROR_INVALID_ADVERTISING_INTERVALS in 2460 self.advertising_msg) 2461 2462 # Verify that the min/max advertising intervals remain the same 2463 # after setting the invalid advertising intervals. 2464 # 2465 # In btmon log, we would see the following message first. 2466 # bluetoothd: Set Advertising Intervals: 0x0010, 0x0010 2467 # And then, we should check if "Min advertising interval" and 2468 # "Max advertising interval" remain the same. 2469 start_str = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x' % ( 2470 self.convert_to_adv_jiffies(invalid_min_adv_interval_ms), 2471 self.convert_to_adv_jiffies(invalid_max_adv_interval_ms)) 2472 2473 search_strings = ['Min advertising interval:', 2474 'Max advertising interval:'] 2475 search_str = '|'.join(search_strings) 2476 2477 contents = self.bluetooth_le_facade.btmon_get(search_str=search_str, 2478 start_str=start_str) 2479 2480 # The min/max advertising intervals of all advertisements should remain 2481 # the same as the previous valid ones. 2482 min_max_str = '[Min|Max] advertising interval: (\d*\.\d*) msec' 2483 min_max_pattern = re.compile(min_max_str) 2484 correct_orig_min_adv_interval = True 2485 correct_orig_max_adv_interval = True 2486 for line in contents: 2487 result = min_max_pattern.search(line) 2488 if result: 2489 interval = float(result.group(1)) 2490 if 'Min' in line and interval != orig_min_adv_interval_ms: 2491 correct_orig_min_adv_interval = False 2492 elif 'Max' in line and interval != orig_max_adv_interval_ms: 2493 correct_orig_max_adv_interval = False 2494 2495 self.results = { 2496 'invalid_intervals_error': invalid_intervals_error, 2497 'correct_orig_min_adv_interval': correct_orig_min_adv_interval, 2498 'correct_orig_max_adv_interval': correct_orig_max_adv_interval} 2499 2500 return all(self.results.values()) 2501 2502 2503 @_test_retry_and_log(False) 2504 def test_check_advertising_intervals(self, min_adv_interval_ms, 2505 max_adv_interval_ms): 2506 """Verify that the advertising intervals are as expected. 2507 2508 @param min_adv_interval_ms: the min advertising interval in ms. 2509 @param max_adv_interval_ms: the max advertising interval in ms. 2510 2511 @returns: True if the advertising intervals are correct. 2512 False otherwise. 2513 2514 """ 2515 self._get_btmon_log(None) 2516 2517 # Verify that the advertising intervals are correct. 2518 min_adv_interval_ms_found, max_adv_interval_ms_found = ( 2519 self._verify_advertising_intervals(min_adv_interval_ms, 2520 max_adv_interval_ms)) 2521 2522 self.results = { 2523 'min_adv_interval_ms_found': min_adv_interval_ms_found, 2524 'max_adv_interval_ms_found': max_adv_interval_ms_found, 2525 } 2526 return all(self.results.values()) 2527 2528 2529 @_test_retry_and_log(False) 2530 def test_reset_advertising(self, instance_ids=[]): 2531 """Verify that advertising is reset correctly. 2532 2533 Note that reset advertising would set advertising intervals to 2534 the default values. However, we would not be able to observe 2535 the values change until new advertisements are registered. 2536 Therefore, it is required that a test_register_advertisement() 2537 test is conducted after this test. 2538 2539 If instance_ids is [], all advertisements would still be removed 2540 if there are any. However, no need to check 'Advertising Removed' 2541 in btmon log since we may or may not be able to observe the message. 2542 This feature is needed if this test is invoked as the first one in 2543 a test case to reset advertising. In this situation, this test does 2544 not know how many advertisements exist. 2545 2546 @param instance_ids: the list of instance IDs that should be removed. 2547 2548 @returns: True if advertising is reset correctly. 2549 False otherwise. 2550 2551 """ 2552 self.count_advertisements = 0 2553 self._get_btmon_log( 2554 lambda: self.bluetooth_le_facade.reset_advertising()) 2555 2556 # Verify that every advertisement is removed. When an advertisement 2557 # with instance id 1 is removed, the log looks like 2558 # Advertising Removed 2559 # instance: 1 2560 if len(instance_ids) > 0: 2561 advertisement_removed = self.bluetooth_le_facade.btmon_find( 2562 'Advertising Removed') 2563 if advertisement_removed: 2564 for instance_id in instance_ids: 2565 txt = 'Instance: %d' % instance_id 2566 if not self.bluetooth_le_facade.btmon_find(txt): 2567 advertisement_removed = False 2568 break 2569 else: 2570 advertisement_removed = True 2571 2572 if not advertisement_removed: 2573 logging.error('Failed to remove advertisement') 2574 2575 # Verify that "Reset Advertising Intervals" command has been issued. 2576 reset_advertising_intervals = self.bluetooth_le_facade.btmon_find( 2577 'bluetoothd: Reset Advertising Intervals') 2578 2579 # Verify the advertising is disabled. 2580 advertising_disabled_observied = self.bluetooth_le_facade.btmon_find( 2581 'Advertising: Disabled') 2582 # If there are no existing advertisements, we may not observe 2583 # 'Advertising: Disabled'. 2584 advertising_disabled = (instance_ids == [] or 2585 advertising_disabled_observied) 2586 2587 self.results = { 2588 'advertisement_removed': advertisement_removed, 2589 'reset_advertising_intervals': reset_advertising_intervals, 2590 'advertising_disabled': advertising_disabled, 2591 } 2592 return all(self.results.values()) 2593 2594 2595 def add_device(self, address, address_type, action): 2596 """Add a device to the Kernel action list.""" 2597 return self.bluetooth_facade.add_device(address, address_type, action) 2598 2599 2600 def remove_device(self, address, address_type): 2601 """Remove a device from the Kernel action list.""" 2602 return self.bluetooth_facade.remove_device(address,address_type) 2603 2604 2605 def read_supported_commands(self): 2606 """Read the set of supported commands from the Kernel.""" 2607 return self.bluetooth_facade.read_supported_commands() 2608 2609 2610 def read_info(self): 2611 """Read the adapter information from the Kernel.""" 2612 return self.bluetooth_facade.read_info() 2613 2614 2615 def get_adapter_properties(self): 2616 """Read the adapter properties from the Bluetooth Daemon.""" 2617 return self.bluetooth_facade.get_adapter_properties() 2618 2619 2620 def get_dev_info(self): 2621 """Read raw HCI device information.""" 2622 return self.bluetooth_facade.get_dev_info() 2623 2624 def log_settings(self, msg, settings): 2625 """function convert MGMT_OP_READ_INFO settings to string 2626 2627 @param msg: string to include in output 2628 @param settings: bitstring returned by MGMT_OP_READ_INFO 2629 @return : List of strings indicating different settings 2630 """ 2631 strs = [] 2632 if settings & bluetooth_socket.MGMT_SETTING_POWERED: 2633 strs.append("POWERED") 2634 if settings & bluetooth_socket.MGMT_SETTING_CONNECTABLE: 2635 strs.append("CONNECTABLE") 2636 if settings & bluetooth_socket.MGMT_SETTING_FAST_CONNECTABLE: 2637 strs.append("FAST-CONNECTABLE") 2638 if settings & bluetooth_socket.MGMT_SETTING_DISCOVERABLE: 2639 strs.append("DISCOVERABLE") 2640 if settings & bluetooth_socket.MGMT_SETTING_PAIRABLE: 2641 strs.append("PAIRABLE") 2642 if settings & bluetooth_socket.MGMT_SETTING_LINK_SECURITY: 2643 strs.append("LINK-SECURITY") 2644 if settings & bluetooth_socket.MGMT_SETTING_SSP: 2645 strs.append("SSP") 2646 if settings & bluetooth_socket.MGMT_SETTING_BREDR: 2647 strs.append("BR/EDR") 2648 if settings & bluetooth_socket.MGMT_SETTING_HS: 2649 strs.append("HS") 2650 if settings & bluetooth_socket.MGMT_SETTING_LE: 2651 strs.append("LE") 2652 logging.debug('%s : %s', msg, " ".join(strs)) 2653 return strs 2654 2655 def log_flags(self, msg, flags): 2656 """Function to convert HCI state configuration to a string 2657 2658 @param msg: string to include in output 2659 @param settings: bitstring returned by get_dev_info 2660 @return : List of strings indicating different flags 2661 """ 2662 strs = [] 2663 if flags & bluetooth_socket.HCI_UP: 2664 strs.append("UP") 2665 else: 2666 strs.append("DOWN") 2667 if flags & bluetooth_socket.HCI_INIT: 2668 strs.append("INIT") 2669 if flags & bluetooth_socket.HCI_RUNNING: 2670 strs.append("RUNNING") 2671 if flags & bluetooth_socket.HCI_PSCAN: 2672 strs.append("PSCAN") 2673 if flags & bluetooth_socket.HCI_ISCAN: 2674 strs.append("ISCAN") 2675 if flags & bluetooth_socket.HCI_AUTH: 2676 strs.append("AUTH") 2677 if flags & bluetooth_socket.HCI_ENCRYPT: 2678 strs.append("ENCRYPT") 2679 if flags & bluetooth_socket.HCI_INQUIRY: 2680 strs.append("INQUIRY") 2681 if flags & bluetooth_socket.HCI_RAW: 2682 strs.append("RAW") 2683 logging.debug('%s [HCI]: %s', msg, " ".join(strs)) 2684 return strs 2685 2686 2687 @_test_retry_and_log(False) 2688 def test_service_resolved(self, address): 2689 """Test that the services under device address can be resolved 2690 2691 @param address: MAC address of a device 2692 2693 @returns: True if the ServicesResolved property is changed before 2694 timeout, False otherwise. 2695 2696 """ 2697 is_resolved_func = self.bluetooth_facade.device_services_resolved 2698 return self._wait_for_condition(lambda : is_resolved_func(address),\ 2699 method_name()) 2700 2701 2702 @_test_retry_and_log(False) 2703 def test_gatt_browse(self, address): 2704 """Test that the GATT client can get the attributes correctly 2705 2706 @param address: MAC address of a device 2707 2708 @returns: True if the attribute map received by GATT client is the same 2709 as expected. False otherwise. 2710 2711 """ 2712 2713 gatt_client_facade = GATT_ClientFacade(self.bluetooth_facade) 2714 actual_app = gatt_client_facade.browse(address) 2715 expected_app = GATT_HIDApplication() 2716 diff = GATT_Application.diff(actual_app, expected_app) 2717 2718 self.result = { 2719 'actural_result': actual_app, 2720 'expected_result': expected_app 2721 } 2722 2723 gatt_attribute_hierarchy = ['Device', 'Service', 'Characteristic', 2724 'Descriptor'] 2725 # Remove any difference in object path 2726 for parent, child in zip(gatt_attribute_hierarchy, 2727 gatt_attribute_hierarchy[1:]): 2728 pattern = re.compile('^%s .* is different in %s' % (child, parent)) 2729 for diff_str in diff[::]: 2730 if pattern.search(diff_str): 2731 diff.remove(diff_str) 2732 2733 if len(diff) != 0: 2734 logging.error('Application Diff: %s', diff) 2735 return False 2736 return True 2737 2738 2739 # ------------------------------------------------------------------- 2740 # Bluetooth mouse related tests 2741 # ------------------------------------------------------------------- 2742 2743 2744 def _record_input_events(self, device, gesture): 2745 """Record the input events. 2746 2747 @param device: the bluetooth HID device. 2748 @param gesture: the gesture method to perform. 2749 2750 @returns: the input events received on the DUT. 2751 2752 """ 2753 self.input_facade.initialize_input_recorder(device.name) 2754 self.input_facade.start_input_recorder() 2755 time.sleep(self.HID_REPORT_SLEEP_SECS) 2756 gesture() 2757 time.sleep(self.HID_REPORT_SLEEP_SECS) 2758 self.input_facade.stop_input_recorder() 2759 time.sleep(self.HID_REPORT_SLEEP_SECS) 2760 event_values = self.input_facade.get_input_events() 2761 events = [Event(*ev) for ev in event_values] 2762 return events 2763 2764 2765 def _test_mouse_click(self, device, button): 2766 """Test that the mouse click events could be received correctly. 2767 2768 @param device: the meta device containing a bluetooth HID device 2769 @param button: which button to test, 'LEFT' or 'RIGHT' 2770 2771 @returns: True if the report received by the host matches the 2772 expected one. False otherwise. 2773 2774 """ 2775 if button == 'LEFT': 2776 gesture = device.LeftClick 2777 elif button == 'RIGHT': 2778 gesture = device.RightClick 2779 else: 2780 raise error.TestError('Button (%s) is not valid.' % button) 2781 2782 actual_events = self._record_input_events(device, gesture) 2783 2784 linux_input_button = {'LEFT': BTN_LEFT, 'RIGHT': BTN_RIGHT} 2785 expected_events = [ 2786 # Button down 2787 recorder.MSC_SCAN_BTN_EVENT[button], 2788 Event(EV_KEY, linux_input_button[button], 1), 2789 recorder.SYN_EVENT, 2790 # Button up 2791 recorder.MSC_SCAN_BTN_EVENT[button], 2792 Event(EV_KEY, linux_input_button[button], 0), 2793 recorder.SYN_EVENT] 2794 2795 self.results = { 2796 'actual_events': map(str, actual_events), 2797 'expected_events': map(str, expected_events)} 2798 return actual_events == expected_events 2799 2800 2801 @_test_retry_and_log 2802 def test_mouse_left_click(self, device): 2803 """Test that the mouse left click events could be received correctly. 2804 2805 @param device: the meta device containing a bluetooth HID device 2806 2807 @returns: True if the report received by the host matches the 2808 expected one. False otherwise. 2809 2810 """ 2811 return self._test_mouse_click(device, 'LEFT') 2812 2813 2814 @_test_retry_and_log 2815 def test_mouse_right_click(self, device): 2816 """Test that the mouse right click events could be received correctly. 2817 2818 @param device: the meta device containing a bluetooth HID device 2819 2820 @returns: True if the report received by the host matches the 2821 expected one. False otherwise. 2822 2823 """ 2824 return self._test_mouse_click(device, 'RIGHT') 2825 2826 2827 def _test_mouse_move(self, device, delta_x=0, delta_y=0): 2828 """Test that the mouse move events could be received correctly. 2829 2830 @param device: the meta device containing a bluetooth HID device 2831 @param delta_x: the distance to move cursor in x axis 2832 @param delta_y: the distance to move cursor in y axis 2833 2834 @returns: True if the report received by the host matches the 2835 expected one. False otherwise. 2836 2837 """ 2838 gesture = lambda: device.Move(delta_x, delta_y) 2839 actual_events = self._record_input_events(device, gesture) 2840 2841 events_x = [Event(EV_REL, REL_X, delta_x)] if delta_x else [] 2842 events_y = [Event(EV_REL, REL_Y, delta_y)] if delta_y else [] 2843 expected_events = events_x + events_y + [recorder.SYN_EVENT] 2844 2845 self.results = { 2846 'actual_events': map(str, actual_events), 2847 'expected_events': map(str, expected_events)} 2848 return actual_events == expected_events 2849 2850 2851 @_test_retry_and_log 2852 def test_mouse_move_in_x(self, device, delta_x): 2853 """Test that the mouse move events in x could be received correctly. 2854 2855 @param device: the meta device containing a bluetooth HID device 2856 @param delta_x: the distance to move cursor in x axis 2857 2858 @returns: True if the report received by the host matches the 2859 expected one. False otherwise. 2860 2861 """ 2862 return self._test_mouse_move(device, delta_x=delta_x) 2863 2864 2865 @_test_retry_and_log 2866 def test_mouse_move_in_y(self, device, delta_y): 2867 """Test that the mouse move events in y could be received correctly. 2868 2869 @param device: the meta device containing a bluetooth HID device 2870 @param delta_y: the distance to move cursor in y axis 2871 2872 @returns: True if the report received by the host matches the 2873 expected one. False otherwise. 2874 2875 """ 2876 return self._test_mouse_move(device, delta_y=delta_y) 2877 2878 2879 @_test_retry_and_log 2880 def test_mouse_move_in_xy(self, device, delta_x, delta_y): 2881 """Test that the mouse move events could be received correctly. 2882 2883 @param device: the meta device containing a bluetooth HID device 2884 @param delta_x: the distance to move cursor in x axis 2885 @param delta_y: the distance to move cursor in y axis 2886 2887 @returns: True if the report received by the host matches the 2888 expected one. False otherwise. 2889 2890 """ 2891 return self._test_mouse_move(device, delta_x=delta_x, delta_y=delta_y) 2892 2893 2894 def _test_mouse_scroll(self, device, units): 2895 """Test that the mouse wheel events could be received correctly. 2896 2897 @param device: the meta device containing a bluetooth HID device 2898 @param units: the units to scroll in y axis 2899 2900 @returns: True if the report received by the host matches the 2901 expected one. False otherwise. 2902 2903 """ 2904 gesture = lambda: device.Scroll(units) 2905 actual_events = self._record_input_events(device, gesture) 2906 expected_events = [Event(EV_REL, REL_WHEEL, units), recorder.SYN_EVENT] 2907 self.results = { 2908 'actual_events': map(str, actual_events), 2909 'expected_events': map(str, expected_events)} 2910 return actual_events == expected_events 2911 2912 2913 @_test_retry_and_log 2914 def test_mouse_scroll_down(self, device, delta_y): 2915 """Test that the mouse wheel events could be received correctly. 2916 2917 @param device: the meta device containing a bluetooth HID device 2918 @param delta_y: the units to scroll down in y axis; 2919 should be a postive value 2920 2921 @returns: True if the report received by the host matches the 2922 expected one. False otherwise. 2923 2924 """ 2925 if delta_y > 0: 2926 return self._test_mouse_scroll(device, delta_y) 2927 else: 2928 raise error.TestError('delta_y (%d) should be a positive value', 2929 delta_y) 2930 2931 2932 @_test_retry_and_log 2933 def test_mouse_scroll_up(self, device, delta_y): 2934 """Test that the mouse wheel events could be received correctly. 2935 2936 @param device: the meta device containing a bluetooth HID device 2937 @param delta_y: the units to scroll up in y axis; 2938 should be a postive value 2939 2940 @returns: True if the report received by the host matches the 2941 expected one. False otherwise. 2942 2943 """ 2944 if delta_y > 0: 2945 return self._test_mouse_scroll(device, -delta_y) 2946 else: 2947 raise error.TestError('delta_y (%d) should be a positive value', 2948 delta_y) 2949 2950 2951 @_test_retry_and_log 2952 def test_mouse_click_and_drag(self, device, delta_x, delta_y): 2953 """Test that the mouse click-and-drag events could be received 2954 correctly. 2955 2956 @param device: the meta device containing a bluetooth HID device 2957 @param delta_x: the distance to drag in x axis 2958 @param delta_y: the distance to drag in y axis 2959 2960 @returns: True if the report received by the host matches the 2961 expected one. False otherwise. 2962 2963 """ 2964 gesture = lambda: device.ClickAndDrag(delta_x, delta_y) 2965 actual_events = self._record_input_events(device, gesture) 2966 2967 button = 'LEFT' 2968 expected_events = ( 2969 [# Button down 2970 recorder.MSC_SCAN_BTN_EVENT[button], 2971 Event(EV_KEY, BTN_LEFT, 1), 2972 recorder.SYN_EVENT] + 2973 # cursor movement in x and y 2974 ([Event(EV_REL, REL_X, delta_x)] if delta_x else []) + 2975 ([Event(EV_REL, REL_Y, delta_y)] if delta_y else []) + 2976 [recorder.SYN_EVENT] + 2977 # Button up 2978 [recorder.MSC_SCAN_BTN_EVENT[button], 2979 Event(EV_KEY, BTN_LEFT, 0), 2980 recorder.SYN_EVENT]) 2981 2982 self.results = { 2983 'actual_events': map(str, actual_events), 2984 'expected_events': map(str, expected_events)} 2985 return actual_events == expected_events 2986 2987 2988 # ------------------------------------------------------------------- 2989 # Bluetooth keyboard related tests 2990 # ------------------------------------------------------------------- 2991 2992 # TODO may be deprecated as stated in b:140515628 2993 @_test_retry_and_log 2994 def test_keyboard_input_from_string(self, device, string_to_send): 2995 """Test that the keyboard's key events could be received correctly. 2996 2997 @param device: the meta device containing a bluetooth HID device 2998 @param string_to_send: the set of keys that will be pressed one-by-one 2999 3000 @returns: True if the report received by the host matches the 3001 expected one. False otherwise. 3002 3003 """ 3004 3005 gesture = lambda: device.KeyboardSendString(string_to_send) 3006 3007 actual_events = self._record_input_events(device, gesture) 3008 3009 resulting_string = bluetooth_test_utils.reconstruct_string( 3010 actual_events) 3011 3012 return string_to_send == resulting_string 3013 3014 3015 @_test_retry_and_log 3016 def test_keyboard_input_from_trace(self, device, trace_name): 3017 """ Tests that keyboard events can be transmitted and received correctly 3018 3019 @param device: the meta device containing a bluetooth HID device 3020 @param trace_name: string name for keyboard activity trace to be used 3021 in the test i.e. "simple_text" 3022 3023 @returns: true if the recorded output matches the expected output 3024 false otherwise 3025 """ 3026 3027 # Read data from trace I/O files 3028 input_trace = bluetooth_test_utils.parse_trace_file(os.path.join( 3029 TRACE_LOCATION, '{}_input.txt'.format(trace_name))) 3030 output_trace = bluetooth_test_utils.parse_trace_file(os.path.join( 3031 TRACE_LOCATION, '{}_output.txt'.format(trace_name))) 3032 3033 if not input_trace or not output_trace: 3034 logging.error('Failure in using trace') 3035 return False 3036 3037 # Disregard timing data for now 3038 input_scan_codes = [tup[1] for tup in input_trace] 3039 predicted_events = [Event(*tup[1]) for tup in output_trace] 3040 3041 # Create and run this trace as a gesture 3042 gesture = lambda: device.KeyboardSendTrace(input_scan_codes) 3043 rec_events = self._record_input_events(device, gesture) 3044 3045 # Filter out any input events that were not from the keyboard 3046 rec_key_events = [ev for ev in rec_events if ev.type == EV_KEY] 3047 3048 # Fail if we didn't record the correct number of events 3049 if len(rec_key_events) != len(input_scan_codes): 3050 return False 3051 3052 for idx, predicted in enumerate(predicted_events): 3053 recorded = rec_key_events[idx] 3054 3055 if not predicted == recorded: 3056 return False 3057 3058 return True 3059 3060 3061 def is_newer_kernel_version(self, version, minimum_version): 3062 """ Check if given kernel version is newer than unsupported version.""" 3063 3064 return utils.compare_versions(version, minimum_version) >= 0 3065 3066 3067 def is_supported_kernel_version(self, kernel_version, minimum_version, 3068 msg=None): 3069 """ Check if kernel version is greater than minimum version. 3070 3071 Check if given kernel version is greater than or equal to minimum 3072 version. Raise TEST_NA if given kernel version is lower than the 3073 minimum version. 3074 3075 Note: Kernel version may have suffixes, so ensure that minimum 3076 version should be the smallest version that is permissible. 3077 Ex: If minimum version is 3.8.11 then 3.8.11-<random> will 3078 pass the check. 3079 3080 @param kernel_version: kernel version to be checked as a string 3081 @param: minimum_version: minimum kernel version requried 3082 3083 @returns: None 3084 3085 @raises: TEST_NA if kernel version is not greater than the minimum 3086 version 3087 """ 3088 3089 logging.debug('kernel version is {} minimum version' 3090 'is {}'.format(kernel_version,minimum_version)) 3091 3092 if msg is None: 3093 msg = 'Test not supported on this kernel version' 3094 3095 if not self.is_newer_kernel_version(kernel_version, minimum_version): 3096 logging.debug('Kernel version check failed. Exiting the test') 3097 raise error.TestNAError(msg) 3098 3099 logging.debug('Kernel version check passed') 3100 3101 3102 # ------------------------------------------------------------------- 3103 # Servod related tests 3104 # ------------------------------------------------------------------- 3105 3106 @_test_retry_and_log 3107 def test_power_consumption(self, max_power_mw): 3108 """Test the average power consumption.""" 3109 power_mw = self.device.servod.MeasurePowerConsumption() 3110 self.results = {'power_mw': power_mw} 3111 3112 if (power_mw is None): 3113 logging.error('Failed to measure power consumption') 3114 return False 3115 3116 power_mw = float(power_mw) 3117 logging.info('power consumption (mw): %f (max allowed: %f)', 3118 power_mw, max_power_mw) 3119 3120 return power_mw <= max_power_mw 3121 3122 3123 @_test_retry_and_log 3124 def test_start_notify(self, address, uuid, cccd_value): 3125 """Test that a notification can be started on a characteristic 3126 3127 @param address: The MAC address of the remote device. 3128 @param uuid: The uuid of the characteristic. 3129 @param cccd_value: Possible CCCD values include 3130 0x00 - inferred from the remote characteristic's properties 3131 0x01 - notification 3132 0x02 - indication 3133 3134 @returns: The test results. 3135 3136 """ 3137 start_notify = self.bluetooth_facade.start_notify( 3138 address, uuid, cccd_value) 3139 is_notifying = self._wait_for_condition( 3140 lambda: self.bluetooth_facade.is_notifying( 3141 address, uuid), method_name()) 3142 3143 self.results = { 3144 'start_notify': start_notify, 3145 'is_notifying': is_notifying} 3146 3147 return all(self.results.values()) 3148 3149 3150 @_test_retry_and_log 3151 def test_stop_notify(self, address, uuid): 3152 """Test that a notification can be stopped on a characteristic 3153 3154 @param address: The MAC address of the remote device. 3155 @param uuid: The uuid of the characteristic. 3156 3157 @returns: The test results. 3158 3159 """ 3160 stop_notify = self.bluetooth_facade.stop_notify(address, uuid) 3161 is_not_notifying = self._wait_for_condition( 3162 lambda: not self.bluetooth_facade.is_notifying( 3163 address, uuid), method_name()) 3164 3165 self.results = { 3166 'stop_notify': stop_notify, 3167 'is_not_notifying': is_not_notifying} 3168 3169 return all(self.results.values()) 3170 3171 3172 # ------------------------------------------------------------------- 3173 # Autotest methods 3174 # ------------------------------------------------------------------- 3175 3176 3177 def initialize(self): 3178 """Initialize bluetooth adapter tests.""" 3179 # Run through every tests and collect failed tests in self.fails. 3180 self.fails = [] 3181 3182 # If a test depends on multiple conditions, write the results of 3183 # the conditions in self.results so that it is easy to know 3184 # what conditions failed by looking at the log. 3185 self.results = None 3186 3187 # Some tests may instantiate a peripheral device for testing. 3188 self.devices = dict() 3189 for device_type in SUPPORTED_DEVICE_TYPES: 3190 self.devices[device_type] = list() 3191 3192 # The count of registered advertisements. 3193 self.count_advertisements = 0 3194 3195 3196 def check_chameleon(self): 3197 """Check the existence of chameleon_host. 3198 3199 The chameleon_host is specified in --args as follows 3200 3201 (cr) $ test_that --args "chameleon_host=$CHAMELEON_IP" "$DUT_IP" <test> 3202 3203 """ 3204 logging.debug('labels: %s', self.host.get_labels()) 3205 if self.host.chameleon is None: 3206 raise error.TestError('Have to specify chameleon_host IP.') 3207 3208 3209 def run_once(self, *args, **kwargs): 3210 """This method should be implemented by children classes. 3211 3212 Typically, the run_once() method would look like: 3213 3214 factory = remote_facade_factory.RemoteFacadeFactory(host) 3215 self.bluetooth_facade = factory.create_bluetooth_hid_facade() 3216 3217 self.test_bluetoothd_running() 3218 # ... 3219 # invoke more self.test_xxx() tests. 3220 # ... 3221 3222 if self.fails: 3223 raise error.TestFail(self.fails) 3224 3225 """ 3226 raise NotImplementedError 3227 3228 3229 def cleanup(self, test_state='END'): 3230 """Clean up bluetooth adapter tests. 3231 3232 @param test_state: string describing the requested clear is for 3233 a new test(NEW), the middle of the test(MID), 3234 or the end of the test(END). 3235 """ 3236 3237 if test_state is 'END': 3238 # Disable all the bluetooth debug logs 3239 self.enable_disable_debug_log(enable=False) 3240 3241 if hasattr(self, 'host'): 3242 # Stop btmon process 3243 self.host.run('pkill btmon || true') 3244 3245 # Close the device properly if a device is instantiated. 3246 # Note: do not write something like the following statements 3247 # if self.devices[device_type]: 3248 # or 3249 # if bool(self.devices[device_type]): 3250 # Otherwise, it would try to invoke bluetooth_mouse.__nonzero__() 3251 # which just does not exist. 3252 for device_name, device_list in self.devices.items(): 3253 for device in device_list: 3254 if device is not None: 3255 device.Close() 3256 3257 # Power cycle BT device if we're in the middle of a test 3258 if test_state is 'MID': 3259 device.PowerCycle() 3260 3261 self.devices = dict() 3262 for device_type in SUPPORTED_DEVICE_TYPES: 3263 self.devices[device_type] = list() 3264