1# Lint as: python2, python3 2# Copyright 2020 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5"""Facade to access the bluetooth-related functionality.""" 6 7from __future__ import absolute_import 8from __future__ import division 9from __future__ import print_function 10 11import base64 12import binascii 13import collections 14from datetime import datetime, timedelta 15import glob 16# AU tests use ToT client code, but ToT -3 client version. 17try: 18 from gi.repository import GLib, GObject 19except ImportError: 20 import gobject as GObject 21import json 22import logging 23import logging.handlers 24import os 25 26# TODO(b/215715213) - Wait until ebuild runs as python3 to remove this try 27try: 28 import pydbus 29except Exception as e: 30 import platform 31 logging.error('Unable to import pydbus at version=%s: %s', 32 platform.python_version(), e) 33 pydbus = {} 34 35import re 36import subprocess 37import functools 38import time 39import threading 40import traceback 41 42import common 43from autotest_lib.client.bin import utils 44from autotest_lib.client.common_lib.cros.bluetooth import bluetooth_socket 45from autotest_lib.client.common_lib import error 46from autotest_lib.client.cros.udev_helpers import UdevadmInfo, UdevadmTrigger 47from autotest_lib.client.cros.audio import (audio_test_data as 48 audio_test_data_module) 49from autotest_lib.client.cros.audio import check_quality 50from autotest_lib.client.cros.audio import cras_utils 51from autotest_lib.client.cros.audio.sox_utils import ( 52 convert_format, convert_raw_file, get_file_length, 53 trim_silence_from_wav_file) 54from autotest_lib.client.cros.bluetooth import advertisement 55from autotest_lib.client.cros.bluetooth import adv_monitor_helper 56from autotest_lib.client.cros.bluetooth import output_recorder 57from autotest_lib.client.cros.bluetooth import logger_helper 58from autotest_lib.client.cros.bluetooth.floss.adapter_client import ( 59 FlossAdapterClient, BluetoothCallbacks, BluetoothConnectionCallbacks, 60 BondState, SspVariant, Transport) 61from autotest_lib.client.cros.bluetooth.floss.manager_client import FlossManagerClient 62from autotest_lib.client.cros.bluetooth.floss.utils import GLIB_THREAD_NAME 63from autotest_lib.client.cros.power import sys_power 64import six 65from six.moves import map 66from six.moves import range 67 68CheckQualityArgsClass = collections.namedtuple( 69 'args_type', ['filename', 'rate', 'channel', 'bit_width']) 70 71 72def _dbus_byte_array_to_b64_string(dbus_byte_array): 73 """Base64 encodes a dbus byte array for use with the xml rpc proxy. 74 75 Input is encoded to bytes using base64 encoding. Then the base64 bytes is 76 decoded as string. 77 """ 78 return base64.standard_b64encode(bytearray(dbus_byte_array)).decode() 79 80 81def _b64_string_to_dbus_byte_array(b64_string): 82 """Base64 decodes a dbus byte array for use with the xml rpc proxy.""" 83 dbus_array = [] 84 bytes = bytearray(base64.standard_b64decode(b64_string)) 85 for byte in bytes: 86 dbus_array.append(byte) 87 return dbus_array 88 89 90def dbus_safe(default_return_value, return_error=False): 91 """Catch all DBus exceptions and return a default value instead. 92 93 Wrap a function with a try block that catches DBus exceptions and 94 returns the error with the specified return status. The exception is logged 95 to aid in debugging. 96 97 If |return_error| is set, the call will return a tuple with 98 (default_return_value, str(error)). 99 100 @param default_return_value: What value to return in case of errors. 101 @param return_error: Whether to return the error string as well. 102 103 @return Either the return value from the method call if successful or 104 the |default_return_value| or a tuple(default_return_value, 105 str(error)) 106 """ 107 108 def decorator(wrapped_function): 109 """Call a function and catch DBus errors. 110 111 @param wrapped_function function to call in dbus safe context. 112 @return function return value or default_return_value on failure. 113 114 """ 115 116 @functools.wraps(wrapped_function) 117 def wrapper(*args, **kwargs): 118 """Pass args and kwargs to a dbus safe function. 119 120 @param args formal python arguments. 121 @param kwargs keyword python arguments. 122 @return function return value or default_return_value on failure. 123 124 """ 125 logging.debug('%s()', wrapped_function.__name__) 126 try: 127 return wrapped_function(*args, **kwargs) 128 except GLib.Error as e: 129 logging.debug('Exception while performing operation %s: %s', 130 wrapped_function.__name__, e) 131 132 if return_error: 133 return (default_return_value, str(e)) 134 else: 135 return default_return_value 136 except Exception as e: 137 logging.debug('Exception in %s: %s', wrapped_function.__name__, 138 e) 139 logging.debug(traceback.format_exc()) 140 raise 141 142 return wrapper 143 144 return decorator 145 146 147def raw_dbus_call_sync(bus, 148 proxy, 149 iface, 150 method, 151 variant_in_args, 152 variant_out_type, 153 timeout_ms=None): 154 """Makes a raw D-Bus call and returns the unpacked result. 155 156 @param bus: System bus object. 157 @param proxy: Proxy object. 158 @param iface: D-Bus interface that exposes this method. 159 @param method: Name of method to call. 160 @param variant_in_args: A Glib.Variant that corresponds to the method's 161 inputs. 162 @param variant_out_type: A Glib.VariantType that describes the output. This 163 is the type that will be unpacked from the result. 164 @param timeout_ms: Timeout in milliseconds for this method call. 165 166 @returns: Unpacked result from the method call. 167 """ 168 if timeout_ms is None: 169 timeout_ms = GLib.MAXINT 170 171 return bus.con.call_sync(proxy._bus_name, proxy._path, iface, method, 172 variant_in_args, variant_out_type, 0, timeout_ms, 173 None).unpack() 174 175 176def unpack_if_variant(value): 177 """If given value is GLib.Variant, unpack it to the actual type.""" 178 if isinstance(value, GLib.Variant): 179 return value.unpack() 180 181 return value 182 183 184class UpstartClient: 185 """Upstart D-Bus client that allows actions on upstart targets.""" 186 187 UPSTART_MANAGER_SERVICE = 'com.ubuntu.Upstart' 188 UPSTART_MANAGER_PATH = '/com/ubuntu/Upstart' 189 UPSTART_MANAGER_IFACE = 'com.ubuntu.Upstart0_6' 190 UPSTART_JOB_IFACE = 'com.ubuntu.Upstart0_6.Job' 191 192 UPSTART_ERROR_UNKNOWNINSTANCE = ( 193 'com.ubuntu.Upstart0_6.Error.UnknownInstance') 194 UPSTART_ERROR_ALREADYSTARTED = ( 195 'com.ubuntu.Upstart0_6.Error.AlreadyStarted') 196 197 @classmethod 198 def _get_job(cls, job_name): 199 """Get job by name.""" 200 bus = pydbus.SystemBus() 201 obj = bus.get(cls.UPSTART_MANAGER_SERVICE, cls.UPSTART_MANAGER_PATH) 202 job_path = obj[cls.UPSTART_MANAGER_IFACE].GetJobByName(job_name) 203 204 return bus.get(cls.UPSTART_MANAGER_SERVICE, 205 job_path)[cls.UPSTART_JOB_IFACE] 206 207 @staticmethod 208 def _convert_instance_args(source): 209 """Convert instance args dict to array.""" 210 return ['{}={}'.format(k, v) for k, v in source.items()] 211 212 @classmethod 213 def start(cls, job_name, instance_args = {}): 214 """Starts a job. 215 216 @param job_name: Name of upstart job to start. 217 @param instance_args: Instance arguments. Will be converted to array of 218 "key=value". 219 220 @return True if job start was sent successfully. 221 """ 222 try: 223 job = cls._get_job(job_name) 224 converted_args = cls._convert_instance_args(instance_args) 225 job.Start(converted_args, True) 226 except TypeError as t: 227 # Can occur if cls._get_job fails 228 logging.error('Error starting {}: {}'.format(job_name, t)) 229 return False 230 except GLib.Error as e: 231 # An already started error is ok. All other dbus errors should 232 # return False. 233 if cls.UPSTART_ERROR_ALREADYSTARTED not in str(e): 234 logging.error('Error starting {}: {}'.format(job_name, e)) 235 return False 236 237 return True 238 239 @classmethod 240 def stop(cls, job_name, instance_args = {}): 241 """Stops a job. 242 243 @param job_name: Name of upstart job to stop. 244 @param instance_args: Instance arguments. Will be converted to 245 array of "key=value". 246 247 @return True if job stop was sent successfully. 248 """ 249 try: 250 job = cls._get_job(job_name) 251 converted_args = cls._convert_instance_args(instance_args) 252 job.Stop(converted_args, True) 253 except TypeError as t: 254 # Can occur if cls._get_job fails 255 logging.error('Error stopping {}: {}'.format(job_name, t)) 256 return False 257 except GLib.Error as e: 258 # If the job was already stopped, we will see an UnknownInstance 259 # exception. All other failure reasons should be treated as 260 # a failure to stop. 261 if cls.UPSTART_ERROR_UNKNOWNINSTANCE not in str(e): 262 logging.error('Error starting {}: {}'.format(job_name, e)) 263 return False 264 265 return True 266 267 268class BluetoothBaseFacadeLocal(object): 269 """Base facade shared by Bluez and Floss daemons. This takes care of any 270 functionality that is common across the two daemons. 271 """ 272 273 # Both bluez and floss share the same lib dir for configuration and cache 274 BLUETOOTH_LIBDIR = '/var/lib/bluetooth' 275 276 SYSLOG_LEVELS = [ 277 'EMERG', 'ALERT', 'CRIT', 'ERR', 'WARNING', 'NOTICE', 'INFO', 278 'DEBUG' 279 ] 280 281 # How long to wait for hid device 282 HID_TIMEOUT = 15 283 HID_CHECK_SECS = 2 284 285 # Due to problems transferring a date object, we convert to stringtime first 286 # This is the standard format that we will use. 287 OUT_DATE_FORMAT = '%Y-%m-%d %H:%M:%S.%f' 288 289 # Upstart job name for the Floss Manager daemon 290 MANAGER_JOB = "btmanagerd" 291 # File path for btmanagerd 292 BTMANGERD_FILE_PATH = '/usr/bin/btmanagerd' 293 # How long we wait for the manager daemon to come up after we start it 294 DAEMON_TIMEOUT_SEC = 5 295 296 # Upstart job name for ChromeOS Audio daemon 297 CRAS_JOB = "cras" 298 299 CHIPSET_TO_VIDPID = { 300 'MVL-8897': [(('0x02df', '0x912d'), 'SDIO')], 301 'MVL-8997': [(('0x1b4b', '0x2b42'), 'USB')], 302 'QCA-6174A-5-USB': [(('0x168c', '0x003e'), 'USB')], 303 'QCA-6174A-3-UART': [(('0x0271', '0x050a'), 'UART')], 304 'QCA-WCN6856': [(('0x17cb', '0x1103'), 'USB')], 305 'Intel-AX200': [(('0x8086', '0x2723'), 'USB')], # CcP2 306 'Intel-AX201': [ 307 (('0x8086', '0x02f0'), 'USB'), 308 (('0x8086', '0x4df0'), 'USB'), 309 (('0x8086', '0xa0f0'), 'USB'), 310 ], # HrP2 311 'Intel-AC9260': [(('0x8086', '0x2526'), 'USB')], # ThP2 312 'Intel-AC9560': [ 313 (('0x8086', '0x31dc'), 'USB'), # JfP2 314 (('0x8086', '0x9df0'), 'USB') 315 ], 316 'Intel-AC7260': [ 317 (('0x8086', '0x08b1'), 'USB'), # WP2 318 (('0x8086', '0x08b2'), 'USB') 319 ], 320 'Intel-AC7265': [ 321 (('0x8086', '0x095a'), 'USB'), # StP2 322 (('0x8086', '0x095b'), 'USB') 323 ], 324 'Realtek-RTL8822C-USB': [(('0x10ec', '0xc822'), 'USB')], 325 'Realtek-RTL8822C-UART': [(('0x10ec', '0xc822'), 'UART')], 326 'Realtek-RTL8852A-USB': [(('0x10ec', '0x8852'), 'USB')], 327 'Mediatek-MTK7921-USB': [(('0x14c3', '0x7961'), 'USB')], 328 'Mediatek-MTK7921-SDIO': [(('0x037a', '0x7901'), 'SDIO')] 329 330 # The following doesn't expose vid:pid 331 # 'WCN3991-UART' 332 } 333 334 def __init__(self): 335 # Initialize a messages object to record general logging. 336 self.messages = logger_helper.LogManager() 337 338 # Set up cras test client for audio tests 339 self._cras_test_client = cras_utils.CrasTestClient() 340 341 def configure_floss(self, enabled): 342 """Start and configure the Floss manager daemon. 343 344 In order to manage whether we use bluez or floss, we need to start the 345 Floss manager daemon and then set floss enabled. This exists in the base 346 implementation because bluez tests will need to start the manager to 347 disable Floss. 348 349 @param enabled: Whether to enable Floss 350 351 @return Whether Floss was configured successfully. 352 """ 353 # Start manager daemon or exit early 354 if not UpstartClient.start(self.MANAGER_JOB): 355 return False 356 357 # Since we've just started the manager daemon, we also need to recreate 358 # the client. 359 self.manager_client = FlossManagerClient(self.bus) 360 361 # Wait for the manager daemon to come up 362 try: 363 utils.poll_for_condition( 364 condition=(lambda: self.manager_client.has_proxy()), 365 desc='Wait for manager daemon to come up', 366 sleep_interval=0.5, 367 timeout=self.DAEMON_TIMEOUT_SEC) 368 except Exception as e: 369 logging.error('timeout: error starting manager daemon: %s', e) 370 371 # We need to observe callbacks for proper operation. 372 if not self.manager_client.register_callbacks(): 373 logging.error('manager_client: Failed to register callbacks') 374 return False 375 376 # Floss may not yet be enabled so make sure to enable it here. 377 if self.manager_client.get_floss_enabled() != enabled: 378 self.manager_client.set_floss_enabled(enabled) 379 default_adapter = self.manager_client.get_default_adapter() 380 try: 381 utils.poll_for_condition( 382 condition=(lambda: self.manager_client. 383 get_adapter_enabled(default_adapter 384 ) == enabled), 385 desc='Wait for set floss enabled to complete', 386 sleep_interval=0.5, 387 timeout=self.DAEMON_TIMEOUT_SEC) 388 except Exception as e: 389 logging.error('timeout: error waiting for set_floss_enabled') 390 391 # Also configure cras to enable/disable floss 392 self.configure_cras_floss(enabled) 393 394 return True 395 396 def configure_cras_floss(self, enabled): 397 """Configure whether CRAS has floss enabled.""" 398 cras_utils.set_floss_enabled(enabled) 399 400 def _restart_cras(self, enable_floss=False): 401 """Restarts CRAS and sets whether Floss is enabled.""" 402 UpstartClient.stop(self.CRAS_JOB) 403 started = UpstartClient.start(self.CRAS_JOB) 404 405 def _set_floss(): 406 try: 407 self.configure_cras_floss(enable_floss) 408 return True 409 except: 410 return False 411 412 try: 413 if started: 414 utils.poll_for_condition( 415 condition=_set_floss, 416 desc='Wait for CRAS to come up and configure floss', 417 sleep_interval=1, 418 timeout=self.DAEMON_TIMEOUT_SEC) 419 except Exception as e: 420 logging.error('timeout: error waiting to set floss on cras') 421 return False 422 423 # Did we successfully start the cras daemon? 424 return started 425 426 def log_message(self, msg): 427 """ log a message to /var/log/messages.""" 428 try: 429 cmd = ['logger', msg] 430 subprocess.call(cmd) 431 except Exception as e: 432 logging.error("log_message %s failed with %s", cmd, str(e)) 433 434 def messages_start(self): 435 """Start messages monitoring. 436 437 @returns: True if logging started successfully, else False 438 """ 439 440 try: 441 self.messages.StartRecording() 442 return True 443 444 except Exception as e: 445 logging.error('Failed to start log recording with error: %s', e) 446 447 return False 448 449 def messages_stop(self): 450 """Stop messages monitoring. 451 452 @returns: True if logs were successfully gathered since logging started, 453 else False 454 """ 455 try: 456 self.messages.StopRecording() 457 return True 458 459 except Exception as e: 460 logging.error('Failed to stop log recording with error: %s', e) 461 462 return False 463 464 def messages_find(self, pattern_str): 465 """Find if a pattern string exists in messages output. 466 467 @param pattern_str: the pattern string to find. 468 469 @returns: True on success. False otherwise. 470 471 """ 472 return self.messages.LogContains(pattern_str) 473 474 def clean_bluetooth_kernel_log(self, log_level): 475 """Remove Bluetooth kernel logs in /var/log/messages with loglevel 476 equal to or greater than |log_level| 477 478 @param log_level: int in range [0..7] 479 """ 480 reg_exp = '[^ ]+ ({LEVEL}) kernel: \[.*\] Bluetooth: .*'.format( 481 LEVEL='|'.join(self.SYSLOG_LEVELS[log_level:])) 482 483 logging.debug('Set kernel filter to level %d', log_level) 484 485 self.messages.FilterOut(reg_exp) 486 487 def _encode_base64_json(self, data): 488 """Base64 encode and json encode the data. 489 Required to handle non-ascii data 490 491 @param data: data to be base64 and JSON encoded 492 493 @return: base64 and JSON encoded data 494 495 """ 496 logging.debug('_encode_base64_json raw data is %s', data) 497 b64_encoded = utils.base64_recursive_encode(data) 498 logging.debug('base64 encoded data is %s', b64_encoded) 499 json_encoded = json.dumps(b64_encoded) 500 logging.debug('JSON encoded data is %s', json_encoded) 501 return json_encoded 502 503 def is_wrt_supported(self): 504 """Check if Bluetooth adapter support WRT logs 505 506 WRT is supported on Intel adapters other than (StP2 and WP2) 507 508 @returns : True if adapter is Intel made. 509 """ 510 # Dict of Intel Adapters that support WRT and vid:pid 511 vid_pid_dict = { 512 'HrP2': '8086:02f0', 513 'ThP2': '8086:2526', 514 'JfP2': '8086:31dc', 515 'JfP2-2': '8086:9df0' 516 } # On Sarien/Arcada 517 518 def _get_lspci_vid_pid(output): 519 """ parse output of lspci -knn and get the vid:pid 520 521 output is of the form '01:00.0 Network controller [0280]: 522 \Intel Corporation Device [8086:2526] (rev 29)\n' 523 524 @returns : 'vid:pid' or None 525 """ 526 try: 527 for i in output.split(b'\n'): 528 if 'Network controller' in i.decode('utf-8'): 529 logging.debug('Got line %s', i) 530 if 'Intel Corporation' in i.decode('utf-8'): 531 return i.split(b'[')[2].split(b']')[0] 532 return None 533 except Exception as e: 534 logging.debug('Exception in _get_lspci_vidpid %s', str(e)) 535 return None 536 537 try: 538 cmd = ['lspci', '-knn'] 539 output = subprocess.check_output(cmd, encoding='UTF-8') 540 vid_pid = _get_lspci_vid_pid(output) 541 logging.debug("got vid_pid %s", vid_pid) 542 if vid_pid is not None: 543 if vid_pid in list(vid_pid_dict.values()): 544 return True 545 except Exception as e: 546 logging.error('is_intel_adapter failed with %s', cmd, str(e)) 547 return False 548 549 def enable_wrt_logs(self): 550 """ Enable WRT logs for Intel Bluetooth adapters. 551 552 This is applicable only to Intel adapters. 553 Execute a series of custom hciconfig commands to 554 setup WRT log collection 555 556 Precondition : 557 1) Check if the DUT has Intel controller other than StP2 558 2) Make sure the controller is powered on 559 """ 560 fw_trace_cmd = ( 561 'hcitool cmd 3f 7c 01 10 00 00 00 FE 81 02 80 04 00 00' 562 ' 00 01 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00' 563 ' 00 00 00 00 00 00 00') 564 ddc_read_cmd = 'hcitool cmd 3f 8c 28 01' 565 ddc_write_cmd_prefix = 'hcitool cmd 3f 8b 03 28 01' 566 hw_trace_cmd = ( 567 'hcitool cmd 3f 6f 01 08 00 00 00 00 00 00 00 00 01 00' 568 ' 00 03 01 03 03 03 10 03 6A 0A 6A 0A 6A 0A 6A 0A 00 00' 569 ' 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00' 570 ' 00 00 00 00 00 00') 571 multi_comm_trace_str = ('000000F600000000005002000000003F3F3F3' 572 'F3F003F000000000000000001000000000000000000' 573 '000000000000000000000000000000000000000000' 574 '00000000000000000000000000000000000000000' 575 '00000000000000000') 576 multi_comm_trace_file = ('/sys/kernel/debug/ieee80211' 577 '/phy0/iwlwifi/iwlmvm/send_hcmd') 578 579 def _execute_cmd(cmd_str, msg=''): 580 """Wrapper around subprocess.check_output. 581 582 @params cmd: Command to be executed as a string 583 @params msg: Optional description of the command 584 585 @returns: (True, output) if execution succeeded 586 (False, None) if execution failed 587 588 """ 589 try: 590 logging.info('Executing %s cmd', msg) 591 cmd = cmd_str.split(' ') 592 logging.debug('command is "%s"', cmd) 593 output = subprocess.check_output(cmd, enconding='UTF-8') 594 logging.info('%s cmd successfully executed', msg) 595 logging.debug('output is %s', output) 596 return (True, output) 597 except Exception as e: 598 logging.error('Exception %s while executing %s command', 599 str(e), msg) 600 return (False, None) 601 602 def _get_ddc_write_cmd(ddc_read_result, ddc_write_cmd_prefix): 603 """ Create ddc_write_cmd from read command 604 605 This function performs the following 606 1) Take the output of ddc_read_cmd which is in following form 607 '< HCI Command: ogf 0x3f, ocf 0x008c, plen 1\n 608 01 \n> 609 HCI Event: 0x0e plen 6\n 01 8C FC 12 00 18 \n' 610 2) Take the last value of the output 611 01 8C FC 12 00 ===>> 18 <==== 612 3) Bitwise or with 0x40 613 0x18 | 0x40 = 0x58 614 4) Add it to the end of the ddc_write_cmd 615 'hcitool 01 8C FC 00 28 01 ===> 58 <====' 616 617 """ 618 last_line = [ 619 i for i in ddc_read_result.strip().split(b'\n') if i != '' 620 ][-1] 621 last_byte = [i for i in last_line.split(b' ') if i != ''][-1] 622 processed_byte = hex(int(last_byte, 16) | 0x40).split('0x')[1] 623 cmd = ddc_write_cmd_prefix + ' ' + processed_byte 624 logging.debug('ddc_write_cmd is %s', cmd) 625 return cmd 626 627 try: 628 logging.info('Enabling WRT logs') 629 status, _ = _execute_cmd(fw_trace_cmd, 'FW trace cmd') 630 if not status: 631 logging.info('FW trace command execution failed') 632 return False 633 634 status, ddc_read_result = _execute_cmd(ddc_read_cmd, 'DDC Read') 635 if not status: 636 logging.info('DDC Read command execution failed') 637 return False 638 639 ddc_write_cmd = _get_ddc_write_cmd(ddc_read_result, 640 ddc_write_cmd_prefix) 641 logging.debug('DDC Write command is %s', ddc_write_cmd) 642 status, _ = _execute_cmd(ddc_write_cmd, 'DDC Write') 643 if not status: 644 logging.info('DDC Write commanad execution failed') 645 return False 646 647 status, hw_trace_result = _execute_cmd(hw_trace_cmd, 'HW trace') 648 if not status: 649 logging.info('HW Trace command execution failed') 650 return False 651 652 logging.debug('Executing the multi_comm_trace cmd %s to file %s', 653 multi_comm_trace_str, multi_comm_trace_file) 654 with open(multi_comm_trace_file, 'w') as f: 655 f.write(multi_comm_trace_str + '\n') 656 f.flush() 657 658 logging.info('WRT Logs enabled') 659 return True 660 except Exception as e: 661 logging.error('Exception %s while enabling WRT logs', str(e)) 662 return False 663 664 def collect_wrt_logs(self): 665 """Collect the WRT logs for Intel Bluetooth adapters 666 667 This is applicable only to Intel adapters. 668 Execute following command to collect WRT log. The logs are 669 copied to /var/spool/crash/ 670 671 'echo 1 > sudo tee /sys/kernel/debug/ieee80211/phy0' 672 '/iwlwifi/iwlmvm/fw_dbg_collect' 673 This is to be called only after enable_wrt_logs is called 674 675 676 Precondition: 677 1) enable_wrt_logs has been called 678 """ 679 680 def _collect_logs(): 681 """Execute command to collect wrt logs.""" 682 try: 683 with open( 684 '/sys/kernel/debug/ieee80211/phy0/iwlwifi/' 685 'iwlmvm/fw_dbg_collect', 'w') as f: 686 f.write('1') 687 f.flush() 688 # There is some flakiness in log collection. This sleep 689 # is due to the flakiness 690 time.sleep(10) 691 return True 692 except Exception as e: 693 logging.error('Exception %s in _collect logs ', str(e)) 694 return False 695 696 def _get_num_log_files(): 697 """Return number of WRT log files.""" 698 try: 699 return len(glob.glob('/var/spool/crash/devcoredump_iwlwifi*')) 700 except Exception as e: 701 logging.debug('Exception %s raised in _get_num_log_files', 702 str(e)) 703 return 0 704 705 try: 706 logging.info('Collecting WRT logs') 707 # 708 # The command to trigger the logs does seems to work always. 709 # As a workaround for this flakiness, execute it multiple times 710 # until a new log is created 711 # 712 num_logs_present = _get_num_log_files() 713 logging.debug('%s logs present', num_logs_present) 714 for i in range(10): 715 time.sleep(1) 716 logging.debug('Executing command to collect WRT logs ') 717 if _collect_logs(): 718 logging.debug('Command to collect WRT logs executed') 719 else: 720 logging.debug('Command to collect WRT logs failed') 721 continue 722 723 if _get_num_log_files() > num_logs_present: 724 logging.info('Successfully collected WRT logs ') 725 return True 726 else: 727 logging.debug('Log file not written. Trying again') 728 729 logging.info('Unable to collect WRT logs') 730 return False 731 except Exception as e: 732 logging.error('Exception %s while collecting WRT logs', str(e)) 733 return False 734 735 def _get_wake_enabled_path(self): 736 # Walk up the parents from hci0 sysfs path and find the first one with 737 # a power/wakeup property. Return that path (including power/wakeup). 738 739 # Resolve hci path to get full device path (i.e. w/ usb or uart) 740 search_at = os.path.realpath('/sys/class/bluetooth/hci0') 741 742 # Exit early if path doesn't exist 743 if not os.path.exists(search_at): 744 return None 745 746 # Walk up parents and try to find one with 'power/wakeup' 747 for _ in range(search_at.count('/') - 1): 748 search_at = os.path.normpath(os.path.join(search_at, '..')) 749 try: 750 path = os.path.join(search_at, 'power', 'wakeup') 751 with open(path, 'r') as f: 752 return path 753 except IOError: 754 # No power wakeup at the given location so keep going 755 continue 756 757 return None 758 759 def _is_wake_enabled(self): 760 search_at = self._get_wake_enabled_path() 761 762 if search_at is not None: 763 try: 764 with open(search_at, 'r') as f: 765 value = f.read() 766 logging.info('Power/wakeup found at {}: {}'.format( 767 search_at, value)) 768 return 'enabled' in value 769 except IOError: 770 # Path was not readable 771 return False 772 773 logging.debug('No power/wakeup path found') 774 return False 775 776 def _set_wake_enabled(self, value): 777 path = self._get_wake_enabled_path() 778 if path is not None: 779 try: 780 with open(path, 'w') as f: 781 f.write('enabled' if value else 'disabled') 782 return True 783 except IOError: 784 # Path was not writeable 785 return False 786 787 return False 788 789 def is_wake_enabled(self): 790 """Checks whether the bluetooth adapter has wake enabled. 791 792 This will walk through all parents of the hci0 sysfs path and try to 793 find one with a 'power/wakeup' entry and returns whether its value is 794 'enabled'. 795 796 @return True if 'power/wakeup' of an hci0 parent is 'enabled' 797 """ 798 enabled = self._is_wake_enabled() 799 return enabled 800 801 def set_wake_enabled(self, value): 802 """Sets wake enabled to the value if path exists. 803 804 This will walk through all parents of the hci0 sysfs path and write the 805 value to the first one it finds. 806 807 @param value: Sets power/wakeup to "enabled" if value is true, else 808 "disabled" 809 810 @return True if it wrote value to a power/wakeup, False otherwise 811 """ 812 return self._set_wake_enabled(value) 813 814 def wait_for_hid_device(self, device_address, timeout, sleep_interval): 815 """Waits for hid device with given device address. 816 817 @param device_address: Peripheral address 818 @param timeout: maximum number of seconds to wait 819 @param sleep_interval: time to sleep between polls 820 821 @return True if hid device found, False otherwise 822 """ 823 824 def _match_hid_to_device(hidpath, device_address): 825 """Check if given hid syspath is for the given device address """ 826 # If the syspath has a uniq property that matches the peripheral 827 # device's address, then it has matched 828 props = UdevadmInfo.GetProperties(hidpath) 829 if (props.get(b'uniq', b'').lower().decode() == device_address): 830 logging.info('Found hid device for address {} at {}'.format( 831 device_address, hidpath)) 832 return True 833 else: 834 logging.info('Path {} is not right device.'.format(hidpath)) 835 836 return False 837 838 def _hid_is_created(device_address): 839 existing_inputs = UdevadmTrigger( 840 subsystem_match=['input']).DryRun() 841 for entry in existing_inputs: 842 entry = entry.decode() 843 bt_hid = any([t in entry for t in ['uhid', 'hci']]) 844 logging.info('udevadm trigger entry is {}: {}'.format( 845 bt_hid, entry)) 846 847 if (bt_hid and _match_hid_to_device(entry, 848 device_address.lower())): 849 return True 850 851 return False 852 853 if timeout is None: 854 timeout = self.HID_TIMEOUT 855 if sleep_interval is None: 856 sleep_interval = self.HID_CHECK_SECS 857 858 method_name = 'wait_for_hid_device' 859 try: 860 utils.poll_for_condition( 861 condition=(lambda: _hid_is_created(device_address)), 862 timeout=timeout, 863 sleep_interval=sleep_interval, 864 desc=('Waiting for HID device to be created from %s' % 865 device_address)) 866 return True 867 except utils.TimeoutError as e: 868 logging.error('%s: %s', method_name, e) 869 except Exception as e: 870 logging.error('%s: unexpected error: %s', method_name, e) 871 872 return False 873 874 def _powerd_last_resume_details(self, before=5, after=0): 875 """ Look at powerd logs for last suspend/resume attempt. 876 877 Note that logs are in reverse order (chronologically). Keep that in mind 878 for the 'before' and 'after' parameters. 879 880 @param before: Number of context lines before search item to show. 881 @param after: Number of context lines after search item to show. 882 883 @return Most recent lines containing suspend resume details or ''. 884 """ 885 event_file = '/var/log/power_manager/powerd.LATEST' 886 887 # Each powerd_suspend wakeup has a log "powerd_suspend returned 0", 888 # with the return code of the suspend. We search for the last 889 # occurrence in the log, and then find the collocated event_count log, 890 # indicating the wakeup cause. -B option for grep will actually grab the 891 # *next* 5 logs in time, since we are piping the powerd file backwards 892 # with tac command 893 resume_indicator = 'powerd_suspend returned' 894 cmd = 'tac {} | grep -A {} -B {} -m1 "{}"'.format( 895 event_file, after, before, resume_indicator) 896 897 try: 898 return utils.run(cmd).stdout 899 except error.CmdError: 900 logging.error('Could not locate recent suspend') 901 902 return '' 903 904 def bt_caused_last_resume(self): 905 """Checks if last resume from suspend was caused by bluetooth 906 907 @return: True if BT wake path was cause of resume, False otherwise 908 """ 909 910 # When the resume cause is printed to powerd log, it omits the 911 # /power/wakeup portion of wake path 912 bt_wake_path = self._get_wake_enabled_path() 913 914 # If bluetooth does not have a valid wake path, it could not have caused 915 # the resume 916 if not bt_wake_path: 917 return False 918 919 bt_wake_path = bt_wake_path.replace('/power/wakeup', '') 920 921 last_resume_details = self._powerd_last_resume_details().rstrip( 922 '\n ').split('\n') 923 logging.debug('/var/log/power_manager/powerd.LATEST: 5 lines after ' 924 'powerd_suspend returns:') 925 for l in last_resume_details[::-1]: 926 logging.debug(l) 927 # If BT caused wake, there will be a line describing the bt wake 928 # path's event_count before and after the resume 929 for line in last_resume_details: 930 if 'event_count' in line: 931 logging.info('Checking wake event: {}'.format(line)) 932 if bt_wake_path in line: 933 logging.debug('BT event woke the DUT') 934 return True 935 936 return False 937 938 def find_last_suspend_via_powerd_logs(self): 939 """ Finds the last suspend attempt via powerd logs. 940 941 Finds the last suspend attempt using powerd logs by searching backwards 942 through the logs to find the latest entries with 'powerd_suspend'. If we 943 can't find a suspend attempt, we return None. 944 945 @return: Tuple (suspend start time, suspend end time, suspend result) or 946 None if we can't find a suspend attempt 947 """ 948 # Logs look like this (ignore newline): 949 # 2021-02-11T18:53:43.561880Z INFO powerd: 950 # [daemon.cc(724)] powerd_suspend returned 0 951 # ... stuff in between ... 952 # 2021-02-11T18:53:13.277695Z INFO powerd: 953 # [suspender.cc(574)] Starting suspend 954 955 # Date format for strptime and strftime 956 date_format = '%Y-%m-%dT%H:%M:%S.%fZ' 957 date_group_re = ('(?P<date>[0-9]+-[0-9]+-[0-9]+T' 958 '[0-9]+:[0-9]+:[0-9]+[.][0-9]+Z)\s') 959 960 finish_suspend_re = re.compile( 961 '^{date_regex}' 962 '.*daemon.*powerd_suspend returned ' 963 '(?P<exitcode>[0-9]+)'.format(date_regex=date_group_re)) 964 start_suspend_re = re.compile( 965 '^{date_regex}.*suspender.*' 966 'Starting suspend'.format(date_regex=date_group_re)) 967 968 now = datetime.now() 969 last_resume_details = self._powerd_last_resume_details(before=0, 970 after=8) 971 if last_resume_details: 972 start_time, end_time, ret = None, None, None 973 try: 974 for line in last_resume_details.split('\n'): 975 logging.debug('Last suspend search: %s', line) 976 m = finish_suspend_re.match(line) 977 if m: 978 logging.debug('Found suspend end: date(%s) ret(%s)', 979 m.group('date'), m.group('exitcode')) 980 end_time = datetime.strptime( 981 m.group('date'), 982 date_format).replace(year=now.year) 983 ret = int(m.group('exitcode')) 984 985 m = start_suspend_re.match(line) 986 if m: 987 logging.debug('Found suspend start: date(%s)', 988 m.group('date')) 989 start_time = datetime.strptime( 990 m.group('date'), 991 date_format).replace(year=now.year) 992 break 993 994 if all([x is not None for x in [start_time, end_time, ret]]): 995 # Return dates in string format due to inconsistency between 996 # python2/3 usage on host and dut 997 return (start_time.strftime(self.OUT_DATE_FORMAT), 998 end_time.strftime(self.OUT_DATE_FORMAT), ret) 999 else: 1000 logging.error( 1001 'Failed to parse details from last suspend. %s %s %s', 1002 str(start_time), str(end_time), str(ret)) 1003 except Exception as e: 1004 logging.error('Failed to parse last suspend: %s', str(e)) 1005 else: 1006 logging.error('No powerd_suspend attempt found') 1007 1008 return None 1009 1010 def do_suspend(self, seconds, expect_bt_wake): 1011 """Suspend DUT using the power manager. 1012 1013 @param seconds: The number of seconds to suspend the device. 1014 @param expect_bt_wake: Whether we expect bluetooth to wake us from 1015 suspend. If true, we expect this resume will occur early 1016 1017 @throws: SuspendFailure on resume with unexpected timing or wake source. 1018 The raised exception will be handled as a non-zero retcode over the 1019 RPC, signalling for the test to fail. 1020 """ 1021 early_wake = False 1022 try: 1023 sys_power.do_suspend(seconds) 1024 1025 except sys_power.SpuriousWakeupError: 1026 logging.info('Early resume detected...') 1027 early_wake = True 1028 1029 # Handle error conditions based on test expectations, whether resume 1030 # was early, and cause of the resume 1031 bt_caused_wake = self.bt_caused_last_resume() 1032 logging.info('Cause for resume: {}'.format( 1033 'BT' if bt_caused_wake else 'Not BT')) 1034 1035 if not expect_bt_wake and bt_caused_wake: 1036 raise sys_power.SuspendFailure('BT woke us unexpectedly') 1037 1038 # TODO(b/160803597) - Uncomment when BT wake reason is correctly 1039 # captured in powerd log. 1040 # 1041 # if expect_bt_wake and not bt_caused_wake: 1042 # raise sys_power.SuspendFailure('BT should have woken us') 1043 # 1044 # if bt_caused_wake and not early_wake: 1045 # raise sys_power.SuspendFailure('BT wake did not come early') 1046 1047 return True 1048 1049 def get_wlan_vid_pid(self): 1050 """ Return vendor id and product id of the wlan chip on BT/WiFi module 1051 1052 @returns: (vid,pid) on success; (None,None) on failure 1053 """ 1054 vid = None 1055 pid = None 1056 path_template = '/sys/class/net/%s/device/' 1057 for dev_name in ['wlan0', 'mlan0']: 1058 if os.path.exists(path_template % dev_name): 1059 path_v = path_template % dev_name + 'vendor' 1060 path_d = path_template % dev_name + 'device' 1061 logging.debug('Paths are %s %s', path_v, path_d) 1062 try: 1063 vid = open(path_v).read().strip('\n') 1064 pid = open(path_d).read().strip('\n') 1065 break 1066 except Exception as e: 1067 logging.error('Exception %s while reading vid/pid', str(e)) 1068 logging.debug('returning vid:%s pid:%s', vid, pid) 1069 return (vid, pid) 1070 1071 def get_bt_transport(self): 1072 """ Return transport (UART/USB/SDIO) used by BT module 1073 1074 @returns: USB/UART/SDIO on success; None on failure 1075 """ 1076 try: 1077 transport_str = os.path.realpath( 1078 '/sys/class/bluetooth/hci0/device/driver/module') 1079 logging.debug('transport is %s', transport_str) 1080 transport = transport_str.split('/')[-1] 1081 if transport == 'btusb': 1082 return 'USB' 1083 elif transport == 'hci_uart': 1084 return 'UART' 1085 elif transport in ['btmrvl_sdio', 'btmtksdio']: 1086 return 'SDIO' 1087 else: 1088 return None 1089 except Exception as e: 1090 logging.error('Exception %s in get_bt_transport', str(e)) 1091 return None 1092 1093 def get_bt_module_name(self): 1094 """ Return bluetooth module name for non-USB devices 1095 1096 @returns '' on failure. On success return chipset name, if found in 1097 dict.Otherwise it returns the raw string read. 1098 """ 1099 # map the string read from device to chipset name 1100 chipset_string_dict = { 1101 'qcom,wcn3991-bt\x00': 'WCN3991', 1102 'qcom,wcn6750-bt\x00': 'WCN6750', 1103 } 1104 1105 hci_device = '/sys/class/bluetooth/hci0' 1106 real_path = os.path.realpath(hci_device) 1107 1108 logging.debug('real path is %s', real_path) 1109 if 'usb' in real_path: 1110 return '' 1111 1112 device_path = os.path.join(real_path, 'device', 'of_node', 1113 'compatible') 1114 try: 1115 chipset_string = open(device_path).read() 1116 logging.debug('read string %s from %s', chipset_string, 1117 device_path) 1118 except Exception as e: 1119 logging.error('Exception %s while reading from file', str(e), 1120 device_path) 1121 return '' 1122 1123 if chipset_string in chipset_string_dict: 1124 return chipset_string_dict[chipset_string] 1125 else: 1126 logging.debug("Chipset not known. Returning %s", chipset_string) 1127 return chipset_string 1128 1129 def get_chipset_name(self): 1130 """ Get the name of BT/WiFi chipset on this host 1131 1132 @returns chipset name if successful else '' 1133 """ 1134 (vid, pid) = self.get_wlan_vid_pid() 1135 logging.debug('Bluetooth module vid pid is %s %s', vid, pid) 1136 transport = self.get_bt_transport() 1137 logging.debug('Bluetooth transport is %s', transport) 1138 if vid is None or pid is None: 1139 # Controllers that aren't WLAN+BT combo chips does not expose 1140 # Vendor ID/Product ID. Use alternate method. 1141 # This will return one of ['WCN3991', ''] or a string containing 1142 # the name of chipset read from DUT 1143 return self.get_bt_module_name() 1144 for name, l in self.CHIPSET_TO_VIDPID.items(): 1145 if ((vid, pid), transport) in l: 1146 return name 1147 return '' 1148 1149 def get_bt_usb_device_strs(self): 1150 """ Return the usb endpoints for the bluetooth device, if they exist 1151 1152 We wish to be able to identify usb disconnect events that affect our 1153 bluetooth operation. To do so, we must first identify the usb endpoint 1154 that is associated with our bluetooth device. 1155 1156 @returns: Relevant usb endpoints for the bluetooth device, 1157 i.e. ['1-1','1-1.2'] if they exist, 1158 [] otherwise 1159 """ 1160 1161 hci_device = '/sys/class/bluetooth/hci0' 1162 real_path = os.path.realpath(hci_device) 1163 1164 # real_path for a usb bluetooth controller will look something like: 1165 # ../../devices/pci0000:00/0000:00:14.0/usb1/1-4/1-4:1.0/bluetooth/hci0 1166 if 'usb' not in real_path: 1167 return [] 1168 1169 logging.debug('Searching for usb path: {}'.format(real_path)) 1170 1171 # Grab all numbered entries between 'usb' and 'bluetooth' descriptors 1172 m = re.search(r'usb(.*)bluetooth', real_path) 1173 1174 if not m: 1175 logging.error( 1176 'Unable to extract usb dev from {}'.format(real_path)) 1177 return [] 1178 1179 # Return the path as a list of individual usb descriptors 1180 return m.group(1).split('/') 1181 1182 def get_bt_usb_disconnect_str(self): 1183 """ Return the expected log error on USB disconnect 1184 1185 Locate the descriptor that will be used from the list of all usb 1186 descriptors associated with our bluetooth chip, and format into the 1187 expected string error for USB disconnect 1188 1189 @returns: string representing expected usb disconnect log entry if usb 1190 device could be identified, None otherwise 1191 """ 1192 disconnect_log_template = 'usb {}: USB disconnect' 1193 descriptors = self.get_bt_usb_device_strs() 1194 1195 # The usb disconnect log message seems to use the most detailed 1196 # descriptor that does not use the ':1.0' entry 1197 for d in sorted(descriptors, key=len, reverse=True): 1198 if ':' not in d: 1199 return disconnect_log_template.format(d) 1200 1201 return None 1202 1203 def get_device_utc_time(self): 1204 """ Get the current device time in UTC. """ 1205 return datetime.utcnow().strftime(self.OUT_DATE_FORMAT) 1206 1207 def create_audio_record_directory(self, audio_record_dir): 1208 """Create the audio recording directory. 1209 1210 @param audio_record_dir: the audio recording directory 1211 1212 @returns: True on success. False otherwise. 1213 """ 1214 try: 1215 if not os.path.exists(audio_record_dir): 1216 os.makedirs(audio_record_dir) 1217 return True 1218 except Exception as e: 1219 logging.error('Failed to create %s on the DUT: %s', 1220 audio_record_dir, e) 1221 return False 1222 1223 def start_capturing_audio_subprocess(self, audio_data, recording_device): 1224 """Start capturing audio in a subprocess. 1225 1226 @param audio_data: the audio test data 1227 @param recording_device: which device recorded the audio, 1228 possible values are 'recorded_by_dut' or 'recorded_by_peer' 1229 1230 @returns: True on success. False otherwise. 1231 """ 1232 audio_data = json.loads(audio_data) 1233 return self._cras_test_client.start_capturing_subprocess( 1234 audio_data[recording_device], 1235 sample_format=audio_data['format'], 1236 channels=audio_data['channels'], 1237 rate=audio_data['rate'], 1238 duration=audio_data['duration']) 1239 1240 def stop_capturing_audio_subprocess(self): 1241 """Stop capturing audio. 1242 1243 @returns: True on success. False otherwise. 1244 """ 1245 return self._cras_test_client.stop_capturing_subprocess() 1246 1247 def _generate_playback_file(self, audio_data): 1248 """Generate the playback file if it does not exist yet. 1249 1250 Some audio test files may be large. Generate them on the fly 1251 to save the storage of the source tree. 1252 1253 @param audio_data: the audio test data 1254 """ 1255 if not os.path.exists(audio_data['file']): 1256 data_format = dict(file_type='raw', 1257 sample_format='S16_LE', 1258 channel=audio_data['channels'], 1259 rate=audio_data['rate']) 1260 1261 # Make the audio file a bit longer to handle any delay 1262 # issue in capturing. 1263 duration = audio_data['duration'] + 3 1264 audio_test_data_module.GenerateAudioTestData( 1265 data_format=data_format, 1266 path=audio_data['file'], 1267 duration_secs=duration, 1268 frequencies=audio_data['frequencies']) 1269 logging.debug("Raw file generated: %s", audio_data['file']) 1270 1271 def start_playing_audio_subprocess(self, audio_data, pin_device=None): 1272 """Start playing audio in a subprocess. 1273 1274 @param audio_data: the audio test data. 1275 @param pin_device: the device id to play audio. 1276 1277 @returns: True on success. False otherwise. 1278 """ 1279 audio_data = json.loads(audio_data) 1280 self._generate_playback_file(audio_data) 1281 try: 1282 return self._cras_test_client.start_playing_subprocess( 1283 audio_data['file'], 1284 pin_device=pin_device, 1285 channels=audio_data['channels'], 1286 rate=audio_data['rate'], 1287 duration=audio_data['duration']) 1288 except Exception as e: 1289 logging.error("start_playing_subprocess() failed: %s", str(e)) 1290 return False 1291 1292 def stop_playing_audio_subprocess(self): 1293 """Stop playing audio in the subprocess. 1294 1295 @returns: True on success. False otherwise. 1296 """ 1297 return self._cras_test_client.stop_playing_subprocess() 1298 1299 def play_audio(self, audio_data): 1300 """Play audio. 1301 1302 It blocks until it has completed playing back the audio. 1303 1304 @param audio_data: the audio test data 1305 1306 @returns: True on success. False otherwise. 1307 """ 1308 audio_data = json.loads(audio_data) 1309 self._generate_playback_file(audio_data) 1310 return self._cras_test_client.play(audio_data['file'], 1311 channels=audio_data['channels'], 1312 rate=audio_data['rate'], 1313 duration=audio_data['duration']) 1314 1315 def check_audio_frames_legitimacy(self, audio_test_data, recording_device, 1316 recorded_file): 1317 """Get the number of frames in the recorded audio file. 1318 1319 @param audio_test_data: the audio test data 1320 @param recording_device: which device recorded the audio, 1321 possible values are 'recorded_by_dut' or 'recorded_by_peer' 1322 @param recorded_file: the recorded file name 1323 1324 @returns: True if audio frames are legitimate. 1325 """ 1326 if bool(recorded_file): 1327 recorded_filename = recorded_file 1328 else: 1329 audio_test_data = json.loads(audio_test_data) 1330 recorded_filename = audio_test_data[recording_device] 1331 1332 if recorded_filename.endswith('.raw'): 1333 # Make sure that the recorded file does not contain all zeros. 1334 filesize = os.path.getsize(recorded_filename) 1335 cmd_str = 'cmp -s -n %d %s /dev/zero' % (filesize, 1336 recorded_filename) 1337 try: 1338 result = subprocess.call(cmd_str.split()) 1339 return result != 0 1340 except Exception as e: 1341 logging.error("Failed: %s (%s)", cmd_str, str(e)) 1342 return False 1343 else: 1344 # The recorded wav file should not be empty. 1345 wav_file = check_quality.WaveFile(recorded_filename) 1346 return wav_file.get_number_frames() > 0 1347 1348 def convert_audio_sample_rate(self, input_file, out_file, test_data, 1349 new_rate): 1350 """Convert audio file to new sample rate. 1351 1352 @param input_file: Path to file to upsample. 1353 @param out_file: Path to create upsampled file. 1354 @param test_data: Dictionary with information about file. 1355 @param new_rate: New rate to upsample file to. 1356 1357 @returns: True if upsampling succeeded, False otherwise. 1358 """ 1359 test_data = json.loads(test_data) 1360 logging.debug('Resampling file {} to new rate {}'.format( 1361 input_file, new_rate)) 1362 1363 convert_format(input_file, 1364 test_data['channels'], 1365 test_data['bit_width'], 1366 test_data['rate'], 1367 out_file, 1368 test_data['channels'], 1369 test_data['bit_width'], 1370 new_rate, 1371 1.0, 1372 use_src_header=True, 1373 use_dst_header=True) 1374 1375 return os.path.isfile(out_file) 1376 1377 def trim_wav_file(self, 1378 in_file, 1379 out_file, 1380 new_duration, 1381 test_data, 1382 tolerance=0.1): 1383 """Trim long file to desired length. 1384 1385 Trims audio file to length by cutting out silence from beginning and 1386 end. 1387 1388 @param in_file: Path to audio file to be trimmed. 1389 @param out_file: Path to trimmed audio file to create. 1390 @param new_duration: A float representing the desired duration of 1391 the resulting trimmed file. 1392 @param test_data: Dictionary containing information about the test file. 1393 @param tolerance: (optional) A float representing the allowable 1394 difference between trimmed file length and desired duration 1395 1396 @returns: True if file was trimmed successfully, False otherwise. 1397 """ 1398 test_data = json.loads(test_data) 1399 trim_silence_from_wav_file(in_file, out_file, new_duration) 1400 measured_length = get_file_length(out_file, test_data['channels'], 1401 test_data['bit_width'], 1402 test_data['rate']) 1403 return abs(measured_length - new_duration) <= tolerance 1404 1405 def unzip_audio_test_data(self, tar_path, data_dir): 1406 """Unzip audio test data files. 1407 1408 @param tar_path: Path to audio test data tarball on DUT. 1409 @oaram data_dir: Path to directory where to extract test data directory. 1410 1411 @returns: True if audio test data folder exists, False otherwise. 1412 """ 1413 logging.debug('Downloading audio test data on DUT') 1414 # creates path to dir to extract test data to by taking name of the 1415 # tarball without the extension eg. <dir>/file.ext to data_dir/file/ 1416 audio_test_dir = os.path.join( 1417 data_dir, 1418 os.path.split(tar_path)[1].split('.', 1)[0]) 1419 1420 unzip_cmd = 'tar -xf {0} -C {1}'.format(tar_path, data_dir) 1421 1422 unzip_proc = subprocess.Popen(unzip_cmd.split(), 1423 stdout=subprocess.PIPE, 1424 stderr=subprocess.PIPE) 1425 _, stderr = unzip_proc.communicate() 1426 1427 if stderr: 1428 logging.error('Error occurred in unzipping audio data: {}'.format( 1429 str(stderr))) 1430 return False 1431 1432 return unzip_proc.returncode == 0 and os.path.isdir(audio_test_dir) 1433 1434 def convert_raw_to_wav(self, input_file, output_file, test_data): 1435 """Convert raw audio file to wav file. 1436 1437 @oaram input_file: the location of the raw file 1438 @param output_file: the location to place the resulting wav file 1439 @param test_data: the data for the file being converted 1440 1441 @returns: True if conversion was successful otherwise false 1442 """ 1443 test_data = json.loads(test_data) 1444 convert_raw_file(input_file, test_data['channels'], 1445 test_data['bit_width'], test_data['rate'], 1446 output_file) 1447 1448 return os.path.isfile(output_file) 1449 1450 def get_primary_frequencies(self, audio_test_data, recording_device, 1451 recorded_file): 1452 """Get primary frequencies of the audio test file. 1453 1454 @param audio_test_data: the audio test data 1455 @param recording_device: which device recorded the audio, 1456 possible values are 'recorded_by_dut' or 'recorded_by_peer' 1457 @param recorded_file: the recorded file name 1458 1459 @returns: a list of primary frequencies of channels in the audio file 1460 """ 1461 audio_test_data = json.loads(audio_test_data) 1462 1463 if bool(recorded_file): 1464 recorded_filename = recorded_file 1465 else: 1466 recorded_filename = audio_test_data[recording_device] 1467 1468 args = CheckQualityArgsClass(filename=recorded_filename, 1469 rate=audio_test_data['rate'], 1470 channel=audio_test_data['channels'], 1471 bit_width=16) 1472 raw_data, rate = check_quality.read_audio_file(args) 1473 checker = check_quality.QualityChecker(raw_data, rate) 1474 # The highest frequency recorded would be near 24 Khz 1475 # as the max sample rate is 48000 in our tests. 1476 # So let's set ignore_high_freq to be 48000. 1477 checker.do_spectral_analysis(ignore_high_freq=48000, 1478 check_quality=False, 1479 quality_params=None) 1480 spectra = checker._spectrals 1481 primary_freq = [ 1482 float(spectra[i][0][0]) if spectra[i] else 0 1483 for i in range(len(spectra)) 1484 ] 1485 primary_freq.sort() 1486 return primary_freq 1487 1488 def enable_wbs(self, value): 1489 """Enable or disable wideband speech (wbs) per the value. 1490 1491 @param value: True to enable wbs. 1492 1493 @returns: True if the operation succeeds. 1494 """ 1495 return self._cras_test_client.enable_wbs(value) 1496 1497 def set_player_playback_status(self, status): 1498 """Set playback status for the registered media player. 1499 1500 @param status: playback status in string. 1501 1502 """ 1503 return self._cras_test_client.set_player_playback_status(status) 1504 1505 def set_player_position(self, position): 1506 """Set media position for the registered media player. 1507 1508 @param position: position in micro seconds. 1509 1510 """ 1511 return self._cras_test_client.set_player_position(position) 1512 1513 def set_player_metadata(self, metadata): 1514 """Set metadata for the registered media player. 1515 1516 @param metadata: dictionary of media metadata. 1517 1518 """ 1519 return self._cras_test_client.set_player_metadata(metadata) 1520 1521 def set_player_length(self, length): 1522 """Set media length for the registered media player. 1523 1524 Media length is a part of metadata information. However, without 1525 specify its type to int64. dbus-python will guess the variant type to 1526 be int32 by default. Separate it from the metadata function to help 1527 prepare the data differently. 1528 1529 @param length: length in micro seconds. 1530 1531 """ 1532 return self._cras_test_client.set_player_length(length) 1533 1534 def select_input_device(self, device_name): 1535 """Select the audio input device. 1536 1537 @param device_name: the name of the Bluetooth peer device 1538 1539 @returns: True if the operation succeeds. 1540 """ 1541 return self._cras_test_client.select_input_device(device_name) 1542 1543 @dbus_safe(None) 1544 def select_output_node(self, node_type): 1545 """Select the audio output node. 1546 1547 @param node_type: the node type of the Bluetooth peer device 1548 1549 @returns: True if the operation succeeds. 1550 """ 1551 return cras_utils.set_single_selected_output_node(node_type) 1552 1553 @dbus_safe(None) 1554 def get_selected_output_device_type(self): 1555 """Get the selected audio output node type. 1556 1557 @returns: the node type of the selected output device. 1558 """ 1559 # Note: should convert the dbus.String to the regular string. 1560 return str(cras_utils.get_selected_output_device_type()) 1561 1562 @dbus_safe(None) 1563 def get_device_id_from_node_type(self, node_type, is_input): 1564 """Gets device id from node type. 1565 1566 @param node_type: a node type defined in CRAS_NODE_TYPES. 1567 @param is_input: True if the node is input. False otherwise. 1568 1569 @returns: a string for device id. 1570 """ 1571 return cras_utils.get_device_id_from_node_type(node_type, is_input) 1572 1573 def get_audio_thread_summary(self): 1574 """Dumps audio thread info. 1575 1576 @returns: a list of cras audio information. 1577 """ 1578 return cras_utils.get_audio_thread_summary() 1579 1580 def is_btmanagerd_present(self): 1581 """ Check if /usr/bin/btmanagerd file is present 1582 1583 @returns: True if /usr/bin/btmanagerd is present and False if not 1584 """ 1585 return os.path.exists(self.BTMANGERD_FILE_PATH) 1586 1587 1588class BluezPairingAgent: 1589 """The agent handling the authentication process of bluetooth pairing. 1590 1591 BluezPairingAgent overrides RequestPinCode method to return a given pin code. 1592 User can use this agent to pair bluetooth device which has a known 1593 pin code. 1594 1595 TODO (josephsih): more pairing modes other than pin code would be 1596 supported later. 1597 1598 """ 1599 1600 def __init__(self, bus, path, pin): 1601 """Constructor. 1602 1603 @param bus: system bus object. 1604 @param path: Object path to register. 1605 @param pin: Pin to respond with for |RequestPinCode|. 1606 """ 1607 self._pin = pin 1608 self.path = path 1609 self.obj = bus.register_object(path, self, None) 1610 1611 # D-Bus service definition (required by pydbus). 1612 dbus = """ 1613 <node> 1614 <interface name="org.bluez.Agent1"> 1615 <method name="RequestPinCode"> 1616 <arg type="o" name="device_path" direction="in" /> 1617 <arg type="s" name="response" direction="out" /> 1618 </method> 1619 <method name="AuthorizeService"> 1620 <arg type="o" name="device_path" direction="in" /> 1621 <arg type="s" name="uuid" direction="in" /> 1622 <arg type="b" name="response" direction="out" /> 1623 </method> 1624 </interface> 1625 </node> 1626 """ 1627 1628 def unregister(self): 1629 """Unregisters self from bus.""" 1630 self.obj.unregister() 1631 1632 def RequestPinCode(self, device_path): 1633 """Requests pin code for a device. 1634 1635 Returns the known pin code for the request. 1636 1637 @param device_path: The object path of the device. 1638 1639 @returns: The known pin code. 1640 1641 """ 1642 logging.info('RequestPinCode for %s; return %s', device_path, 1643 self._pin) 1644 return self._pin 1645 1646 def AuthorizeService(self, device_path, uuid): 1647 """Authorize given service for device. 1648 1649 @param device_path: The object path of the device. 1650 @param uuid: The service that needs to be authorized. 1651 1652 @returns: True (we authorize everything since this is a test) 1653 """ 1654 return True 1655 1656 1657class BluezFacadeLocal(BluetoothBaseFacadeLocal): 1658 """Exposes DUT methods called remotely during Bluetooth autotests for the 1659 Bluez daemon. 1660 1661 All instance methods of this object without a preceding '_' are exposed via 1662 an XML-RPC server. This is not a stateless handler object, which means that 1663 if you store state inside the delegate, that state will remain around for 1664 future calls. 1665 """ 1666 1667 BLUETOOTHD_JOB = 'bluetoothd' 1668 1669 DBUS_ERROR_SERVICEUNKNOWN = 'org.freedesktop.DBus.Error.ServiceUnknown' 1670 1671 BLUEZ_SERVICE_NAME = 'org.bluez' 1672 BLUEZ_MANAGER_PATH = '/' 1673 BLUEZ_DEBUG_LOG_PATH = '/org/chromium/Bluetooth' 1674 BLUEZ_DEBUG_LOG_IFACE = 'org.chromium.Bluetooth.Debug' 1675 BLUEZ_MANAGER_IFACE = 'org.freedesktop.DBus.ObjectManager' 1676 BLUEZ_ADAPTER_IFACE = 'org.bluez.Adapter1' 1677 BLUEZ_ADMIN_POLICY_SET_IFACE = 'org.bluez.AdminPolicySet1' 1678 BLUEZ_ADMIN_POLICY_STATUS_IFACE = 'org.bluez.AdminPolicyStatus1' 1679 BLUEZ_BATTERY_IFACE = 'org.bluez.Battery1' 1680 BLUEZ_DEVICE_IFACE = 'org.bluez.Device1' 1681 BLUEZ_GATT_SERV_IFACE = 'org.bluez.GattService1' 1682 BLUEZ_GATT_CHAR_IFACE = 'org.bluez.GattCharacteristic1' 1683 BLUEZ_GATT_DESC_IFACE = 'org.bluez.GattDescriptor1' 1684 BLUEZ_LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1' 1685 BLUEZ_ADV_MONITOR_MANAGER_IFACE = 'org.bluez.AdvertisementMonitorManager1' 1686 BLUEZ_AGENT_MANAGER_PATH = '/org/bluez' 1687 BLUEZ_AGENT_MANAGER_IFACE = 'org.bluez.AgentManager1' 1688 BLUEZ_PROFILE_MANAGER_PATH = '/org/bluez' 1689 BLUEZ_PROFILE_MANAGER_IFACE = 'org.bluez.ProfileManager1' 1690 BLUEZ_ERROR_ALREADY_EXISTS = 'org.bluez.Error.AlreadyExists' 1691 BLUEZ_PLUGIN_DEVICE_IFACE = 'org.chromium.BluetoothDevice' 1692 DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' 1693 AGENT_PATH = '/test/agent' 1694 1695 BTMON_STOP_DELAY_SECS = 3 1696 1697 # Timeout for how long we'll wait for BlueZ and the Adapter to show up 1698 # after reset. 1699 ADAPTER_TIMEOUT = 30 1700 1701 # How long we should wait for property update signal before we cancel it. 1702 PROPERTY_UPDATE_TIMEOUT_MILLI_SECS = 5000 1703 1704 # How often we should check for property update exit. 1705 PROPERTY_UPDATE_CHECK_MILLI_SECS = 500 1706 1707 def __init__(self): 1708 # Init the BaseFacade first 1709 super(BluezFacadeLocal, self).__init__() 1710 1711 # Open the Bluetooth Raw socket to the kernel which provides us direct, 1712 # raw, access to the HCI controller. 1713 self._raw = bluetooth_socket.BluetoothRawSocket() 1714 1715 # Open the Bluetooth Control socket to the kernel which provides us 1716 # raw management access to the Bluetooth Host Subsystem. Read the list 1717 # of adapter indexes to determine whether or not this device has a 1718 # Bluetooth Adapter or not. 1719 self._control = bluetooth_socket.BluetoothControlSocket() 1720 self._has_adapter = len(self._control.read_index_list()) > 0 1721 1722 # Create an Advertisement Monitor App Manager instance. 1723 # This needs to be created before making any dbus connections as 1724 # AdvMonitorAppMgr internally forks a new helper process and due to 1725 # a limitation of python, it is not possible to fork a new process 1726 # once any dbus connections are established. 1727 self.advmon_appmgr = adv_monitor_helper.AdvMonitorAppMgr() 1728 1729 # Set up the connection to the D-Bus System Bus, get the object for 1730 # the Bluetooth Userspace Daemon (BlueZ) and that daemon's object for 1731 # the Bluetooth Adapter, and the advertising manager. 1732 self.bus = pydbus.SystemBus() 1733 self._update_bluez() 1734 self._update_adapter() 1735 self._update_advertising() 1736 self._update_adv_monitor_manager() 1737 1738 # The agent to handle pin code request, which will be 1739 # created when user calls pair_legacy_device method. 1740 self._pairing_agent = None 1741 # The default capability of the agent. 1742 self._capability = 'KeyboardDisplay' 1743 1744 # Initialize a btmon object to record bluetoothd's activity. 1745 self.btmon = output_recorder.OutputRecorder( 1746 ['btmon', '-c', 'never'], 1747 stop_delay_secs=self.BTMON_STOP_DELAY_SECS) 1748 1749 self.advertisements = [] 1750 self.advmon_interleave_logger = logger_helper.InterleaveLogger() 1751 self._chrc_property = None 1752 self._timeout_id = 0 1753 self._signal_watch = None 1754 self._dbus_mainloop = GObject.MainLoop() 1755 1756 @dbus_safe(False) 1757 def set_debug_log_levels(self, bluez_vb, kernel_vb): 1758 """Enable or disable the debug logs of bluetooth 1759 1760 @param bluez_vb: verbosity of bluez debug log, either 0 or 1 1761 @param kernel_vb: verbosity of kernel debug log, either 0 or 1 1762 1763 """ 1764 debug_object = self.bus.get(self.BLUEZ_SERVICE_NAME, 1765 self.BLUEZ_DEBUG_LOG_PATH) 1766 1767 # Make a raw synchronous call using GLib (pydbus doesn't correctly 1768 # serialize '(yy)'. 1769 raw_dbus_call_sync(self.bus, debug_object, self.BLUEZ_DEBUG_LOG_IFACE, 1770 'SetLevels', 1771 GLib.Variant('(yy)', (bluez_vb, kernel_vb)), 1772 GLib.VariantType.new('()')) 1773 return 1774 1775 @dbus_safe(False) 1776 def set_quality_debug_log(self, enable): 1777 """Enable or disable bluez quality debug log in the DUT 1778 @param enable: True to enable all of the debug log, 1779 False to disable all of the debug log. 1780 """ 1781 bluez_debug = self.bus.get( 1782 self.BLUEZ_SERVICE_NAME, self.BLUEZ_DEBUG_LOG_PATH)[ 1783 self.BLUEZ_DEBUG_LOG_IFACE] 1784 bluez_debug.SetQualityDebug(enable) 1785 1786 @dbus_safe(False) 1787 def start_bluetoothd(self): 1788 """start bluetoothd. 1789 1790 This includes powering up the adapter. 1791 1792 @returns: True if bluetoothd is started correctly. 1793 False otherwise. 1794 1795 """ 1796 # Always start bluez tests with Floss disabled 1797 self.configure_floss(enabled=False) 1798 1799 # Start the daemon and exit if that fails. 1800 if not UpstartClient.start(self.BLUETOOTHD_JOB): 1801 return False 1802 1803 logging.debug('waiting for bluez start') 1804 try: 1805 utils.poll_for_condition(condition=self._update_bluez, 1806 desc='Bluetooth Daemon has started.', 1807 timeout=self.ADAPTER_TIMEOUT) 1808 except Exception as e: 1809 logging.error('timeout: error starting bluetoothd: %s', e) 1810 return False 1811 1812 # Waiting for the self._adapter object. 1813 # This does not mean that the adapter is powered on. 1814 logging.debug('waiting for bluez to obtain adapter information') 1815 try: 1816 utils.poll_for_condition( 1817 condition=self._update_adapter, 1818 desc='Bluetooth Daemon has adapter information.', 1819 timeout=self.ADAPTER_TIMEOUT) 1820 except Exception as e: 1821 logging.error('timeout: error starting adapter: %s', e) 1822 return False 1823 1824 # Waiting for the self._advertising interface object. 1825 logging.debug('waiting for bluez to obtain interface manager.') 1826 try: 1827 utils.poll_for_condition( 1828 condition=self._update_advertising, 1829 desc='Bluetooth Daemon has advertising interface.', 1830 timeout=self.ADAPTER_TIMEOUT) 1831 except utils.TimeoutError: 1832 logging.error('timeout: error getting advertising interface') 1833 return False 1834 1835 # Register the pairing agent so we can authorize connections 1836 logging.debug('registering default pairing agent') 1837 self._setup_pairing_agent(0) 1838 1839 return True 1840 1841 @dbus_safe(False) 1842 def stop_bluetoothd(self): 1843 """stop bluetoothd. 1844 1845 @returns: True if bluetoothd is stopped correctly. 1846 False otherwise. 1847 1848 """ 1849 1850 def bluez_stopped(): 1851 """Checks the bluetooth daemon status. 1852 1853 @returns: True if bluez is stopped. False otherwise. 1854 1855 """ 1856 return not self._update_bluez() 1857 1858 # Stop the daemon and exit if that fails. 1859 if not UpstartClient.stop(self.BLUETOOTHD_JOB): 1860 return False 1861 1862 logging.debug('waiting for bluez stop') 1863 try: 1864 utils.poll_for_condition(condition=bluez_stopped, 1865 desc='Bluetooth Daemon has stopped.', 1866 timeout=self.ADAPTER_TIMEOUT) 1867 bluetoothd_stopped = True 1868 except Exception as e: 1869 logging.error('timeout: error stopping bluetoothd: %s', e) 1870 bluetoothd_stopped = False 1871 1872 return bluetoothd_stopped 1873 1874 def restart_cras(self): 1875 """Restarts the cras daemon.""" 1876 return self._restart_cras() 1877 1878 def is_bluetoothd_running(self): 1879 """Is bluetoothd running? 1880 1881 @returns: True if bluetoothd is running 1882 1883 """ 1884 return bool(self._get_dbus_proxy_for_bluetoothd()) 1885 1886 def is_bluetoothd_proxy_valid(self): 1887 """Checks whether the proxy object for bluetoothd is ok. 1888 1889 The dbus proxy object (self._bluez) can become unusable if bluetoothd 1890 crashes or restarts for any reason. This method checks whether this has 1891 happened by attempting to use the object proxy. If bluetoothd has 1892 restarted (or is not available), then the session will no longer be 1893 valid and this will result in a dbus exception (GLib.Error). 1894 1895 Returns: 1896 True if the bluez proxy is still usable. False otherwise. 1897 """ 1898 1899 try: 1900 return self.is_bluetoothd_running() and bool( 1901 self._objmgr_proxy) and bool( 1902 self._objmgr_proxy.GetManagedObjects()) 1903 except GLib.Error: 1904 return False 1905 1906 def _update_bluez(self): 1907 """Store a D-Bus proxy for the Bluetooth daemon in self._bluez. 1908 1909 This may be called in a loop until it returns True to wait for the 1910 daemon to be ready after it has been started. 1911 1912 @return True on success, False otherwise. 1913 1914 """ 1915 self._bluez = self._get_dbus_proxy_for_bluetoothd() 1916 return bool(self._bluez) 1917 1918 @property 1919 def _objmgr_proxy(self): 1920 """Returns proxy object to object manager if bluez is valid.""" 1921 if self._bluez: 1922 return self._bluez[self.BLUEZ_MANAGER_IFACE] 1923 1924 return None 1925 1926 @dbus_safe(False) 1927 def _get_dbus_proxy_for_bluetoothd(self): 1928 """Get the D-Bus proxy for the Bluetooth daemon. 1929 1930 @return True on success, False otherwise. 1931 1932 """ 1933 bluez = None 1934 try: 1935 bluez = self.bus.get(self.BLUEZ_SERVICE_NAME, 1936 self.BLUEZ_MANAGER_PATH) 1937 logging.debug('bluetoothd is running') 1938 except GLib.Error as e: 1939 # When bluetoothd is not running, the exception looks like 1940 # org.freedesktop.DBus.Error.ServiceUnknown: The name org.bluez 1941 # was not provided by any .service files 1942 if self.DBUS_ERROR_SERVICEUNKNOWN in str(e): 1943 logging.debug('bluetoothd is not running') 1944 else: 1945 logging.error('Error getting dbus proxy for Bluez: %s', e) 1946 return bluez 1947 1948 def _update_adapter(self): 1949 """Store a D-Bus proxy for the local adapter in self._adapter. 1950 1951 This may be called in a loop until it returns True to wait for the 1952 daemon to be ready, and have obtained the adapter information itself, 1953 after it has been started. 1954 1955 Since not all devices will have adapters, this will also return True 1956 in the case where we have obtained an empty adapter index list from the 1957 kernel. 1958 1959 Note that this method does not power on the adapter. 1960 1961 @return True on success, including if there is no local adapter, 1962 False otherwise. 1963 1964 """ 1965 self._adapter = None 1966 self._adapter_path = None 1967 1968 # Re-check kernel to make sure adapter is available 1969 self._has_adapter = len(self._control.read_index_list()) > 0 1970 1971 if self._bluez is None: 1972 logging.warning('Bluez not found!') 1973 return False 1974 if not self._has_adapter: 1975 logging.debug('Device has no adapter; returning') 1976 return True 1977 (self._adapter, self._adapter_path) = self._get_adapter() 1978 return bool(self._adapter) 1979 1980 def _update_advertising(self): 1981 """Store a D-Bus proxy for the local advertising interface manager. 1982 1983 This may be called repeatedly in a loop until True is returned; 1984 otherwise we wait for bluetoothd to start. After bluetoothd starts, we 1985 check the existence of a local adapter and proceed to get the 1986 advertisement interface manager. 1987 1988 Since not all devices will have adapters, this will also return True 1989 in the case where there is no adapter. 1990 1991 @return True on success, including if there is no local adapter, 1992 False otherwise. 1993 1994 """ 1995 self._advertising = None 1996 if self._bluez is None: 1997 logging.warning('Bluez not found!') 1998 return False 1999 if not self._has_adapter: 2000 logging.debug('Device has no adapter; returning') 2001 return True 2002 self._advertising = self._advertising_proxy 2003 return bool(self._advertising) 2004 2005 def _update_adv_monitor_manager(self): 2006 """Store a D-Bus proxy for the local advertisement monitor manager. 2007 2008 This may be called repeatedly in a loop until True is returned; 2009 otherwise we wait for bluetoothd to start. After bluetoothd starts, we 2010 check the existence of a local adapter and proceed to get the 2011 advertisement monitor manager interface. 2012 2013 Since not all devices will have adapters, this will also return True 2014 in the case where there is no adapter. 2015 2016 @return True on success, including if there is no local adapter, 2017 False otherwise. 2018 2019 """ 2020 self._adv_monitor_manager = None 2021 if self._bluez is None: 2022 logging.warning('Bluez not found!') 2023 return False 2024 if not self._has_adapter: 2025 logging.debug('Device has no adapter; returning without ' 2026 'advertisement monitor manager') 2027 return True 2028 self._adv_monitor_manager = self._get_adv_monitor_manager() 2029 return bool(self._adv_monitor_manager) 2030 2031 @dbus_safe(False) 2032 def _get_adapter(self): 2033 """Get the D-Bus proxy for the local adapter. 2034 2035 @return Tuple of (adapter, object_path) on success else (None, None). 2036 2037 """ 2038 objects = self._objmgr_proxy.GetManagedObjects() 2039 for path, ifaces in six.iteritems(objects): 2040 logging.debug('%s -> %r', path, list(ifaces.keys())) 2041 if self.BLUEZ_ADAPTER_IFACE in ifaces: 2042 logging.debug('using adapter %s', path) 2043 adapter = self.bus.get(self.BLUEZ_SERVICE_NAME, path) 2044 return (adapter, path) 2045 else: 2046 logging.warning('No adapter found in interface!') 2047 return (None, None) 2048 2049 @property 2050 def _adapter_proxy(self): 2051 """Returns proxy object to adapter interface if adapter is valid.""" 2052 if self._adapter: 2053 return self._adapter[self.BLUEZ_ADAPTER_IFACE] 2054 2055 return None 2056 2057 @property 2058 def _property_proxy(self): 2059 """Returns proxy object to adapter properties if adapter is valid.""" 2060 if self._adapter: 2061 return self._adapter[self.DBUS_PROP_IFACE] 2062 2063 return None 2064 2065 @property 2066 def _advertising_proxy(self): 2067 """Returns proxy object to advertising interface if adapter is valid.""" 2068 if self._adapter: 2069 return self._adapter[self.BLUEZ_LE_ADVERTISING_MANAGER_IFACE] 2070 2071 return None 2072 2073 @dbus_safe(False) 2074 def _get_adv_monitor_manager(self): 2075 """Get the D-Bus proxy for the local advertisement monitor manager. 2076 2077 @return the advertisement monitor manager interface object. 2078 2079 """ 2080 return self._adapter[self.BLUEZ_ADV_MONITOR_MANAGER_IFACE] 2081 2082 @dbus_safe(False) 2083 def reset_on(self): 2084 """Reset the adapter and settings and power up the adapter. 2085 2086 @return True on success, False otherwise. 2087 2088 """ 2089 return self._reset(set_power=True) 2090 2091 @dbus_safe(False) 2092 def reset_off(self): 2093 """Reset the adapter and settings, leave the adapter powered off. 2094 2095 @return True on success, False otherwise. 2096 2097 """ 2098 return self._reset(set_power=False) 2099 2100 def has_adapter(self): 2101 """Return if an adapter is present. 2102 2103 This will only return True if we have determined both that there is 2104 a Bluetooth adapter on this device (kernel adapter index list is not 2105 empty) and that the Bluetooth daemon has exported an object for it. 2106 2107 @return True if an adapter is present, False if not. 2108 2109 """ 2110 return self._has_adapter and self._adapter is not None 2111 2112 def _reset(self, set_power=False): 2113 """Remove remote devices and set adapter to set_power state. 2114 2115 Do not restart bluetoothd as this may incur a side effect. 2116 The unhappy chrome may disable the adapter randomly. 2117 2118 @param set_power: adapter power state to set (True or False). 2119 2120 @return True on success, False otherwise. 2121 2122 """ 2123 logging.debug('_reset') 2124 2125 if not self._adapter: 2126 logging.warning('Adapter not found!') 2127 return False 2128 2129 objects = self._objmgr_proxy.GetManagedObjects() 2130 2131 devices = [] 2132 for path, ifaces in six.iteritems(objects): 2133 if self.BLUEZ_DEVICE_IFACE in ifaces: 2134 devices.append(objects[path][self.BLUEZ_DEVICE_IFACE]) 2135 2136 # Turn on the adapter in order to remove all remote devices. 2137 if not self.is_powered_on(): 2138 if not self.set_powered(True): 2139 logging.warning('Unable to power on the adapter') 2140 return False 2141 2142 for device in devices: 2143 logging.debug('removing %s', device.get('Address')) 2144 self.remove_device_object(device.get('Address')) 2145 2146 # Toggle power to the adapter. 2147 if not self.set_powered(False): 2148 logging.warning('Unable to power off adapter') 2149 return False 2150 if set_power and not self.set_powered(True): 2151 logging.warning('Unable to power on adapter') 2152 return False 2153 2154 return True 2155 2156 @dbus_safe(False) 2157 def is_discoverable(self): 2158 """Returns whether the adapter is discoverable.""" 2159 return bool(self._get_adapter_properties().get('Discoverable') == 1) 2160 2161 @dbus_safe(False) 2162 def set_powered(self, powered): 2163 """Set the adapter power state. 2164 2165 @param powered: adapter power state to set (True or False). 2166 2167 @return True on success, False otherwise. 2168 2169 """ 2170 if not self._adapter: 2171 if not powered: 2172 # Return success if we are trying to power off an adapter that's 2173 # missing or gone away, since the expected result has happened. 2174 return True 2175 else: 2176 logging.warning('Adapter not found!') 2177 return False 2178 2179 logging.debug('_set_powered %r', powered) 2180 self._property_proxy.Set(self.BLUEZ_ADAPTER_IFACE, 'Powered', 2181 GLib.Variant('b', powered)) 2182 2183 return True 2184 2185 @dbus_safe(False) 2186 def set_discoverable(self, discoverable): 2187 """Set the adapter discoverable state. 2188 2189 @param discoverable: adapter discoverable state to set (True or False). 2190 2191 @return True on success, False otherwise. 2192 2193 """ 2194 if not discoverable and not self._adapter: 2195 # Return success if we are trying to make an adapter that's 2196 # missing or gone away, undiscoverable, since the expected result 2197 # has happened. 2198 return True 2199 self._property_proxy.Set(self.BLUEZ_ADAPTER_IFACE, 'Discoverable', 2200 GLib.Variant('b', discoverable)) 2201 return True 2202 2203 @dbus_safe(False) 2204 def get_discoverable_timeout(self): 2205 """Get the adapter discoverable_timeout. 2206 2207 @return True on success, False otherwise. 2208 2209 """ 2210 return int( 2211 self._property_proxy.Get(self.BLUEZ_ADAPTER_IFACE, 2212 'DiscoverableTimeout')) 2213 2214 @dbus_safe(False) 2215 def set_discoverable_timeout(self, discoverable_timeout): 2216 """Set the adapter discoverable_timeout property. 2217 2218 @param discoverable_timeout: adapter discoverable_timeout value 2219 in seconds to set (Integer). 2220 2221 @return True on success, False otherwise. 2222 2223 """ 2224 self._property_proxy.Set(self.BLUEZ_ADAPTER_IFACE, 2225 'DiscoverableTimeout', 2226 GLib.Variant('u', discoverable_timeout)) 2227 return True 2228 2229 @dbus_safe(False) 2230 def get_pairable_timeout(self): 2231 """Get the adapter pairable_timeout. 2232 2233 @return True on success, False otherwise. 2234 2235 """ 2236 return int( 2237 self._property_proxy.Get(self.BLUEZ_ADAPTER_IFACE, 2238 'PairableTimeout')) 2239 2240 @dbus_safe(False) 2241 def set_pairable_timeout(self, pairable_timeout): 2242 """Set the adapter pairable_timeout property. 2243 2244 @param pairable_timeout: adapter pairable_timeout value 2245 in seconds to set (Integer). 2246 2247 @return True on success, False otherwise. 2248 2249 """ 2250 self._property_proxy.Set(self.BLUEZ_ADAPTER_IFACE, 'PairableTimeout', 2251 GLib.Variant('u', pairable_timeout)) 2252 return True 2253 2254 @dbus_safe(False) 2255 def get_pairable(self): 2256 """Gets the adapter pairable state. 2257 2258 @return Pairable property value. 2259 """ 2260 return bool( 2261 self._property_proxy.Get(self.BLUEZ_ADAPTER_IFACE, 'Pairable')) 2262 2263 @dbus_safe(False) 2264 def set_pairable(self, pairable): 2265 """Set the adapter pairable state. 2266 2267 @param pairable: adapter pairable state to set (True or False). 2268 2269 @return True on success, False otherwise. 2270 2271 """ 2272 self._property_proxy.Set(self.BLUEZ_ADAPTER_IFACE, 'Pairable', 2273 GLib.Variant('b', pairable)) 2274 return True 2275 2276 @dbus_safe(False) 2277 def set_adapter_alias(self, alias): 2278 """Set the adapter alias. 2279 2280 @param alias: adapter alias to set with type String 2281 2282 @return True on success, False otherwise. 2283 """ 2284 self._property_proxy.Set(self.BLUEZ_ADAPTER_IFACE, 'Alias', 2285 GLib.Variant('s', alias)) 2286 return True 2287 2288 def _get_adapter_properties(self): 2289 """Read the adapter properties from the Bluetooth Daemon. 2290 2291 @return the properties as a JSON-encoded dictionary on success, 2292 the value False otherwise. 2293 2294 """ 2295 2296 @dbus_safe({}) 2297 def get_props(): 2298 """Get props from dbus.""" 2299 objects = self._objmgr_proxy.GetManagedObjects() 2300 return objects[self._adapter_path][self.BLUEZ_ADAPTER_IFACE] 2301 2302 if self._bluez and self._adapter: 2303 props = get_props().copy() 2304 else: 2305 props = {} 2306 logging.debug('get_adapter_properties') 2307 for i in props.items(): 2308 logging.debug(i) 2309 return props 2310 2311 def get_adapter_properties(self): 2312 return json.dumps(self._get_adapter_properties()) 2313 2314 def is_powered_on(self): 2315 """Checks whether the adapter is currently powered.""" 2316 return bool(self._get_adapter_properties().get('Powered')) 2317 2318 def get_address(self): 2319 """Gets the current bluez adapter address.""" 2320 return str(self._get_adapter_properties()['Address']) 2321 2322 def get_bluez_version(self): 2323 """Get the BlueZ version. 2324 2325 Returns: 2326 Bluez version like 'BlueZ 5.39'. 2327 """ 2328 return str(self._get_adapter_properties()['Name']) 2329 2330 def get_bluetooth_class(self): 2331 """Get the bluetooth class of the adapter. 2332 2333 Example for Chromebook: 4718852 2334 2335 Returns: 2336 Class of device for the adapter. 2337 """ 2338 return str(self._get_adapter_properties()['Class']) 2339 2340 def read_version(self): 2341 """Read the version of the management interface from the Kernel. 2342 2343 @return the information as a JSON-encoded tuple of: 2344 ( version, revision ) 2345 2346 """ 2347 #TODO(howardchung): resolve 'cannot allocate memory' error when 2348 # BluetoothControlSocket idle too long(about 3 secs) 2349 # (b:137603211) 2350 _control = bluetooth_socket.BluetoothControlSocket() 2351 return json.dumps(_control.read_version()) 2352 2353 def read_supported_commands(self): 2354 """Read the set of supported commands from the Kernel. 2355 2356 @return the information as a JSON-encoded tuple of: 2357 ( commands, events ) 2358 2359 """ 2360 #TODO(howardchung): resolve 'cannot allocate memory' error when 2361 # BluetoothControlSocket idle too long(about 3 secs) 2362 # (b:137603211) 2363 _control = bluetooth_socket.BluetoothControlSocket() 2364 return json.dumps(_control.read_supported_commands()) 2365 2366 def read_index_list(self): 2367 """Read the list of currently known controllers from the Kernel. 2368 2369 @return the information as a JSON-encoded array of controller indexes. 2370 2371 """ 2372 #TODO(howardchung): resolve 'cannot allocate memory' error when 2373 # BluetoothControlSocket idle too long(about 3 secs) 2374 # (b:137603211) 2375 _control = bluetooth_socket.BluetoothControlSocket() 2376 return json.dumps(_control.read_index_list()) 2377 2378 def read_info(self): 2379 """Read the adapter information from the Kernel. 2380 2381 @return the information as a JSON-encoded tuple of: 2382 ( address, bluetooth_version, manufacturer_id, 2383 supported_settings, current_settings, class_of_device, 2384 name, short_name ) 2385 2386 """ 2387 #TODO(howardchung): resolve 'cannot allocate memory' error when 2388 # BluetoothControlSocket idle too long(about 3 secs) 2389 # (b:137603211) 2390 _control = bluetooth_socket.BluetoothControlSocket() 2391 return json.dumps(_control.read_info(0)) 2392 2393 def add_device(self, address, address_type, action): 2394 """Add a device to the Kernel action list. 2395 2396 @param address: Address of the device to add. 2397 @param address_type: Type of device in @address. 2398 @param action: Action to take. 2399 2400 @return on success, a JSON-encoded typle of: 2401 ( address, address_type ), None on failure. 2402 2403 """ 2404 #TODO(howardchung): resolve 'cannot allocate memory' error when 2405 # BluetoothControlSocket idle too long(about 3 secs) 2406 # (b:137603211) 2407 _control = bluetooth_socket.BluetoothControlSocket() 2408 return json.dumps(_control.add_device(0, address, address_type, 2409 action)) 2410 2411 def remove_device(self, address, address_type): 2412 """Remove a device from the Kernel action list. 2413 2414 @param address: Address of the device to remove. 2415 @param address_type: Type of device in @address. 2416 2417 @return on success, a JSON-encoded typle of: 2418 ( address, address_type ), None on failure. 2419 2420 """ 2421 #TODO(howardchung): resolve 'cannot allocate memory' error when 2422 # BluetoothControlSocket idle too long(about 3 secs) 2423 # (b:137603211) 2424 _control = bluetooth_socket.BluetoothControlSocket() 2425 return json.dumps(_control.remove_device(0, address, address_type)) 2426 2427 @dbus_safe(False) 2428 def _get_devices(self): 2429 """Read information about remote devices known to the adapter. 2430 2431 @return the properties of each device in a list 2432 2433 """ 2434 objects = self._objmgr_proxy.GetManagedObjects() 2435 devices = [] 2436 for path, ifaces in six.iteritems(objects): 2437 if self.BLUEZ_DEVICE_IFACE in ifaces: 2438 devices.append(objects[path][self.BLUEZ_DEVICE_IFACE]) 2439 return devices 2440 2441 def _encode_json(self, data): 2442 """Encodes input data as JSON object. 2443 2444 Note that for bytes elements in the input data, they are decoded as 2445 unicode string. 2446 2447 @param data: data to be JSON encoded 2448 2449 @return: JSON encoded data 2450 """ 2451 logging.debug('_encode_json raw data is %s', data) 2452 str_data = utils.bytes_to_str_recursive(data) 2453 json_encoded = json.dumps(str_data) 2454 logging.debug('JSON encoded data is %s', json_encoded) 2455 return json_encoded 2456 2457 def get_devices(self): 2458 """Read information about remote devices known to the adapter. 2459 2460 @return the properties of each device as a JSON-encoded array of 2461 dictionaries on success, the value False otherwise. 2462 2463 """ 2464 devices = self._get_devices() 2465 # Note that bluetooth facade now runs in Python 3. 2466 # Refer to crrev.com/c/3268347. 2467 return self._encode_json(devices) 2468 2469 def get_num_connected_devices(self): 2470 """ Return number of remote devices currently connected to the DUT. 2471 2472 @returns: The number of devices known to bluez with the Connected 2473 property active 2474 """ 2475 num_connected_devices = 0 2476 for dev in self._get_devices(): 2477 if dev and dev.get('Connected', False): 2478 num_connected_devices += 1 2479 2480 return num_connected_devices 2481 2482 @dbus_safe(None) 2483 def get_device_property(self, address, prop_name): 2484 """Read a property of BT device by directly querying device dbus object 2485 2486 @param address: Address of the device to query 2487 @param prop_name: Property to be queried 2488 2489 @return Base 64 JSON repr of property if device is found and has 2490 property, otherwise None on failure. JSON is a recursive 2491 converter, automatically converting dbus types to python natives 2492 and base64 allows us to pass special characters over xmlrpc. 2493 Decode is done in bluetooth_device.py 2494 """ 2495 2496 prop_val = None 2497 2498 # Grab dbus object, _find_device will catch any thrown dbus error 2499 device_obj = self._find_device(address) 2500 2501 if device_obj: 2502 # Query dbus object for property 2503 prop_val = unpack_if_variant(device_obj[self.DBUS_PROP_IFACE].Get( 2504 self.BLUEZ_DEVICE_IFACE, prop_name)) 2505 2506 return self._encode_json(prop_val) 2507 2508 @dbus_safe(None) 2509 def get_battery_property(self, address, prop_name): 2510 """Read a property from Battery1 interface. 2511 2512 @param address: Address of the device to query 2513 @param prop_name: Property to be queried 2514 2515 @return The battery percentage value, or None if does not exist. 2516 """ 2517 2518 prop_val = None 2519 2520 # Grab dbus object, _find_battery will catch any thrown dbus error 2521 battery_obj = self._find_battery(address) 2522 2523 if battery_obj: 2524 # Query dbus object for property 2525 prop_val = unpack_if_variant(battery_obj[self.DBUS_PROP_IFACE].Get( 2526 self.BLUEZ_BATTERY_IFACE, prop_name)) 2527 2528 return prop_val 2529 2530 @dbus_safe(False) 2531 def set_discovery_filter(self, filter): 2532 """Set the discovery filter. 2533 2534 @param filter: The discovery filter to set. 2535 2536 @return True on success, False otherwise. 2537 2538 """ 2539 if not self._adapter: 2540 return False 2541 2542 converted_filter = {} 2543 for key in filter: 2544 converted_filter[key] = GLib.Variant('s', filter[key]) 2545 2546 self._adapter_proxy.SetDiscoveryFilter(converted_filter) 2547 return True 2548 2549 @dbus_safe(False, return_error=True) 2550 def start_discovery(self): 2551 """Start discovery of remote devices. 2552 2553 Obtain the discovered device information using get_devices(), called 2554 stop_discovery() when done. 2555 2556 @return True on success, False otherwise. 2557 2558 """ 2559 if not self._adapter: 2560 return (False, "Adapter Not Found") 2561 self._adapter_proxy.StartDiscovery() 2562 return (True, None) 2563 2564 @dbus_safe(False, return_error=True) 2565 def stop_discovery(self): 2566 """Stop discovery of remote devices. 2567 2568 @return True on success, False otherwise. 2569 2570 """ 2571 if not self._adapter: 2572 return (False, "Adapter Not Found") 2573 self._adapter_proxy.StopDiscovery() 2574 return (True, None) 2575 2576 def is_discovering(self): 2577 """Check if adapter is discovering.""" 2578 return self._get_adapter_properties().get('Discovering', 0) == 1 2579 2580 def get_dev_info(self): 2581 """Read raw HCI device information. 2582 2583 @return JSON-encoded tuple of: 2584 (index, name, address, flags, device_type, bus_type, 2585 features, pkt_type, link_policy, link_mode, 2586 acl_mtu, acl_pkts, sco_mtu, sco_pkts, 2587 err_rx, err_tx, cmd_tx, evt_rx, acl_tx, acl_rx, 2588 sco_tx, sco_rx, byte_rx, byte_tx) on success, 2589 None on failure. 2590 2591 """ 2592 return json.dumps(self._raw.get_dev_info(0)) 2593 2594 @dbus_safe(None, return_error=True) 2595 def get_supported_capabilities(self): 2596 """ Get supported capabilities of the adapter 2597 2598 @returns (capabilities, None) on Success. (None, <error>) on failure 2599 """ 2600 value = self._adapter_proxy.GetSupportedCapabilities() 2601 return (json.dumps(value), None) 2602 2603 @dbus_safe(False) 2604 def register_profile(self, path, uuid, options): 2605 """Register new profile (service). 2606 2607 @param path: Path to the profile object. 2608 @param uuid: Service Class ID of the service as string. 2609 @param options: Dictionary of options for the new service, compliant 2610 with BlueZ D-Bus Profile API standard. 2611 2612 @return True on success, False otherwise. 2613 2614 """ 2615 converted_options = {} 2616 if 'ServiceRecord' in options: 2617 converted_options['ServiceRecord'] = GLib.Variant( 2618 's', options['ServiceRecord']) 2619 2620 profile_manager = self.bus.get( 2621 self.BLUEZ_SERVICE_NAME, self.BLUEZ_PROFILE_MANAGER_PATH)[ 2622 self.BLUEZ_PROFILE_MANAGER_IFACE] 2623 profile_manager.RegisterProfile(path, uuid, converted_options) 2624 return True 2625 2626 def has_device(self, address): 2627 """Checks if the device with a given address exists. 2628 2629 @param address: Address of the device. 2630 2631 @returns: True if there is an interface object with that address. 2632 False if the device is not found. 2633 2634 @raises: Exception if a D-Bus error is encountered. 2635 2636 """ 2637 result = self._find_device(address) 2638 logging.debug('has_device result: %s', str(result)) 2639 2640 # The result being False indicates that there is a D-Bus error. 2641 if result is False: 2642 raise Exception('dbus.Interface error') 2643 2644 # Return True if the result is not None, e.g. a D-Bus interface object; 2645 # False otherwise. 2646 return bool(result) 2647 2648 @dbus_safe(False) 2649 def _find_device(self, address): 2650 """Finds the device with a given address. 2651 2652 Find the device with a given address and returns the 2653 device interface. 2654 2655 @param address: Address of the device. 2656 2657 @returns: An 'org.bluez.Device1' interface to the device. 2658 None if device can not be found. 2659 """ 2660 path = self._get_device_path(address) 2661 if path: 2662 return self.bus.get(self.BLUEZ_SERVICE_NAME, path) 2663 logging.info('Device not found') 2664 return None 2665 2666 @dbus_safe(None) 2667 def _find_battery(self, address): 2668 """Finds the battery with a given address. 2669 2670 Find the battery with a given address and returns the 2671 battery interface. 2672 2673 @param address: Address of the device. 2674 2675 @returns: An 'org.bluez.Battery1' interface to the device. 2676 None if device can not be found. 2677 """ 2678 path = self._get_device_path(address) 2679 if path: 2680 try: 2681 obj = self.bus.get(self.BLUEZ_SERVICE_NAME, path) 2682 if obj[self.BLUEZ_BATTERY_IFACE] is not None: 2683 return obj 2684 except: 2685 pass 2686 logging.info('Battery not found') 2687 return None 2688 2689 @dbus_safe(False) 2690 def _get_device_path(self, address): 2691 """Gets the path for a device with a given address. 2692 2693 Find the device with a given address and returns the 2694 the path for the device. 2695 2696 @param address: Address of the device. 2697 2698 @returns: The path to the address of the device, or None if device is 2699 not found in the object tree. 2700 2701 """ 2702 2703 # Create device path, i.e. '/org/bluez/hci0/dev_AA_BB_CC_DD_EE_FF' based 2704 # on path assignment scheme used in bluez 2705 address_up = address.replace(':', '_') 2706 device_path = '{}/dev_{}'.format(self._adapter_path, address_up) 2707 2708 # Verify the Address property agrees to confirm we have the device 2709 try: 2710 device = self.bus.get(self.BLUEZ_SERVICE_NAME, device_path) 2711 found_addr = device[self.DBUS_PROP_IFACE].Get( 2712 self.BLUEZ_DEVICE_IFACE, 'Address') 2713 2714 if found_addr == address: 2715 logging.info('Device found at {}'.format(device_path)) 2716 return device_path 2717 2718 except KeyError as ke: 2719 logging.debug('Couldn\'t reach device: %s: %s', address, ke) 2720 except GLib.Error as e: 2721 log_msg = 'Couldn\'t reach device: {}'.format(str(e)) 2722 logging.debug(log_msg) 2723 2724 logging.debug('No device found at {}'.format(device_path)) 2725 return None 2726 2727 @dbus_safe(False) 2728 def _setup_pairing_agent(self, pin): 2729 """Initializes and resiters a BluezPairingAgent to handle authentication. 2730 2731 @param pin: The pin code this agent will answer. 2732 2733 """ 2734 if self._pairing_agent: 2735 logging.info( 2736 'Removing the old agent before initializing a new one') 2737 self._pairing_agent.unregister() 2738 self._pairing_agent = None 2739 2740 # Create and register pairing agent 2741 self._pairing_agent = BluezPairingAgent(self.bus, self.AGENT_PATH, pin) 2742 2743 agent_manager = self.bus.get( 2744 self.BLUEZ_SERVICE_NAME, 2745 self.BLUEZ_AGENT_MANAGER_PATH)[self.BLUEZ_AGENT_MANAGER_IFACE] 2746 try: 2747 # Make sure agent is accessible on bus 2748 #agent_obj = self.bus.get(self.BLUEZ_SERVICE_NAME, self.AGENT_PATH) 2749 agent_manager.RegisterAgent(self.AGENT_PATH, str(self._capability)) 2750 except GLib.Error as e: 2751 if self.BLUEZ_ERROR_ALREADY_EXISTS in str(e): 2752 logging.info('Unregistering old agent and registering the new') 2753 agent_manager.UnregisterAgent(self.AGENT_PATH) 2754 agent_manager.RegisterAgent(self.AGENT_PATH, 2755 str(self._capability)) 2756 else: 2757 logging.error('Error setting up pin agent: %s', e) 2758 raise 2759 except Exception as e: 2760 logging.debug('Setup pairing agent: %s', str(e)) 2761 raise 2762 logging.info('Agent registered: %s', self.AGENT_PATH) 2763 2764 @dbus_safe(False) 2765 def _is_paired(self, device): 2766 """Checks if a device is paired. 2767 2768 @param device: An 'org.bluez.Device1' interface to the device. 2769 2770 @returns: True if device is paired. False otherwise. 2771 2772 """ 2773 props = device[self.DBUS_PROP_IFACE] 2774 paired = props.Get(self.BLUEZ_DEVICE_IFACE, 'Paired') 2775 return bool(paired) 2776 2777 @dbus_safe(False) 2778 def device_is_paired(self, address): 2779 """Checks if a device is paired. 2780 2781 @param address: address of the device. 2782 2783 @returns: True if device is paired. False otherwise. 2784 2785 """ 2786 device = self._find_device(address) 2787 if not device: 2788 logging.error('Device not found') 2789 return False 2790 return self._is_paired(device) 2791 2792 @dbus_safe(False) 2793 def _is_connected(self, device): 2794 """Checks if a device is connected. 2795 2796 @param device: An 'org.bluez.Device1' interface to the device. 2797 2798 @returns: True if device is connected. False otherwise. 2799 2800 """ 2801 props = device[self.DBUS_PROP_IFACE] 2802 connected = props.Get(self.BLUEZ_DEVICE_IFACE, 'Connected') 2803 logging.info('Got connected = %r', connected) 2804 return bool(connected) 2805 2806 @dbus_safe(False) 2807 def _set_trusted_by_device(self, device, trusted=True): 2808 """Set the device trusted by device object. 2809 2810 @param device: the device object to set trusted. 2811 @param trusted: True or False indicating whether to set trusted or not. 2812 2813 @returns: True if successful. False otherwise. 2814 2815 """ 2816 try: 2817 properties = device[self.DBUS_PROP_IFACE] 2818 properties.Set(self.BLUEZ_DEVICE_IFACE, 'Trusted', 2819 GLib.Variant('b', trusted)) 2820 return True 2821 except Exception as e: 2822 logging.error('_set_trusted_by_device: %s', e) 2823 except: 2824 logging.error('_set_trusted_by_device: unexpected error') 2825 return False 2826 2827 @dbus_safe(False) 2828 def _set_trusted_by_path(self, device_path, trusted=True): 2829 """Set the device trusted by the device path. 2830 2831 @param device_path: the object path of the device. 2832 @param trusted: True or False indicating whether to set trusted or not. 2833 2834 @returns: True if successful. False otherwise. 2835 2836 """ 2837 try: 2838 device = self.bus.get(self.BLUEZ_SERVICE_NAME, device_path) 2839 return self._set_trusted_by_device(device, trusted) 2840 except Exception as e: 2841 logging.error('_set_trusted_by_path: %s', e) 2842 except: 2843 logging.error('_set_trusted_by_path: unexpected error') 2844 return False 2845 2846 @dbus_safe(False) 2847 def set_trusted(self, address, trusted=True): 2848 """Set the device trusted by address. 2849 2850 @param address: The bluetooth address of the device. 2851 @param trusted: True or False indicating whether to set trusted or not. 2852 2853 @returns: True if successful. False otherwise. 2854 2855 """ 2856 try: 2857 device = self._find_device(address) 2858 return self._set_trusted_by_device(device, trusted) 2859 except Exception as e: 2860 logging.error('set_trusted: %s', e) 2861 except: 2862 logging.error('set_trusted: unexpected error') 2863 return False 2864 2865 @dbus_safe(False) 2866 def pair_legacy_device(self, address, pin, trusted, timeout=60): 2867 """Pairs a device with a given pin code. 2868 2869 Registers a agent who handles pin code request and 2870 pairs a device with known pin code. After pairing, this function will 2871 automatically connect to the device as well (prevents timing issues 2872 between pairing and connect and reduces overall test execution time). 2873 2874 @param address: Address of the device to pair. 2875 @param pin: The pin code of the device to pair. 2876 @param trusted: indicating whether to set the device trusted. 2877 @param timeout: The timeout in seconds for pairing. 2878 2879 @returns: True on success. False otherwise. 2880 2881 """ 2882 2883 def connect_reply(): 2884 """Handler when connect succeeded.""" 2885 logging.info('Device connected: %s', device_path) 2886 2887 def connect_error(error): 2888 """Handler when connect failed. 2889 2890 @param error: one of the errors defined in org.bluez.Error 2891 representing the error in connect. 2892 """ 2893 logging.error('Connect device failed: %s', error) 2894 2895 def pair_reply(): 2896 """Handler when pairing succeeded.""" 2897 logging.info('Device paired: %s', device_path) 2898 if trusted: 2899 self._set_trusted_by_path(device_path, trusted=True) 2900 logging.info('Device trusted: %s', device_path) 2901 2902 # On finishing pairing, also connect 2903 self.dbus_method_with_handlers(device.Connect, 2904 connect_reply, 2905 connect_error, 2906 timeout=timeout * 1000) 2907 2908 def pair_error(error): 2909 """Handler when pairing failed. 2910 2911 @param error: one of errors defined in org.bluez.Error representing 2912 the error in pairing. 2913 2914 """ 2915 if 'org.freedesktop.DBus.Error.NoReply' in str(error): 2916 logging.error('Timed out after %d ms. Cancelling pairing.', 2917 timeout) 2918 device.CancelPairing() 2919 else: 2920 logging.error('Pairing device failed: %s', error) 2921 2922 device = self._find_device(address) 2923 if not device: 2924 logging.error('Device not found') 2925 return False 2926 2927 device_path = self._get_device_path(address) 2928 logging.info('Device %s is found.', device_path) 2929 2930 self._setup_pairing_agent(pin) 2931 2932 try: 2933 if not self._is_paired(device): 2934 logging.info('Device is not paired. Pair and Connect.') 2935 self.dbus_method_with_handlers(device.Pair, 2936 pair_reply, 2937 pair_error, 2938 timeout=timeout * 1000) 2939 elif not self._is_connected(device): 2940 logging.info('Device is already paired. Connect.') 2941 self.dbus_method_with_handlers(device.Connect, 2942 connect_reply, 2943 connect_error, 2944 tiemout=timeout * 1000) 2945 except Exception as e: 2946 logging.error('Exception %s in pair_legacy_device', e) 2947 return False 2948 2949 return self._is_paired(device) and self._is_connected(device) 2950 2951 @dbus_safe(False) 2952 def remove_device_object(self, address): 2953 """Removes a device object and the pairing information. 2954 2955 Calls RemoveDevice method to remove remote device 2956 object and the pairing information. 2957 2958 @param address: Address of the device to unpair. 2959 2960 @returns: True on success. False otherwise. 2961 2962 """ 2963 device = self._find_device(address) 2964 if not device: 2965 logging.error('Device not found') 2966 return False 2967 self._adapter_proxy.RemoveDevice(self._get_device_path(address)) 2968 return True 2969 2970 @dbus_safe(False) 2971 def connect_device(self, address): 2972 """Connects a device. 2973 2974 Connects a device if it is not connected. 2975 2976 @param address: Address of the device to connect. 2977 2978 @returns: True on success. False otherwise. 2979 2980 """ 2981 device = self._find_device(address) 2982 if not device: 2983 logging.error('Device not found') 2984 return False 2985 if self._is_connected(device): 2986 logging.info('Device is already connected') 2987 return True 2988 device.Connect() 2989 return self._is_connected(device) 2990 2991 @dbus_safe(False) 2992 def device_is_connected(self, address): 2993 """Checks if a device is connected. 2994 2995 @param address: Address of the device to connect. 2996 2997 @returns: True if device is connected. False otherwise. 2998 2999 """ 3000 device = self._find_device(address) 3001 if not device: 3002 logging.error('Device not found') 3003 return False 3004 return self._is_connected(device) 3005 3006 @dbus_safe(False) 3007 def disconnect_device(self, address): 3008 """Disconnects a device. 3009 3010 Disconnects a device if it is connected. 3011 3012 @param address: Address of the device to disconnect. 3013 3014 @returns: True on success. False otherwise. 3015 3016 """ 3017 device = self._find_device(address) 3018 if not device: 3019 logging.error('Device not found') 3020 return False 3021 if not self._is_connected(device): 3022 logging.info('Device is not connected') 3023 return True 3024 device.Disconnect() 3025 return not self._is_connected(device) 3026 3027 @dbus_safe(False) 3028 def _device_services_resolved(self, device): 3029 """Checks if services are resolved. 3030 3031 @param device: An 'org.bluez.Device1' interface to the device. 3032 3033 @returns: True if device is connected. False otherwise. 3034 3035 """ 3036 logging.info('device for services resolved: %s', device) 3037 props = device[self.DBUS_PROP_IFACE] 3038 resolved = props.Get(self.BLUEZ_DEVICE_IFACE, 'ServicesResolved') 3039 logging.info('Services resolved = %r', resolved) 3040 return bool(resolved) 3041 3042 @dbus_safe(False) 3043 def device_services_resolved(self, address): 3044 """Checks if service discovery is complete on a device. 3045 3046 Checks whether service discovery has been completed.. 3047 3048 @param address: Address of the remote device. 3049 3050 @returns: True on success. False otherwise. 3051 3052 """ 3053 device = self._find_device(address) 3054 if not device: 3055 logging.error('Device not found') 3056 return False 3057 3058 if not self._is_connected(device): 3059 logging.info('Device is not connected') 3060 return False 3061 3062 return self._device_services_resolved(device) 3063 3064 def btmon_start(self): 3065 """Start btmon monitoring.""" 3066 self.btmon.start() 3067 3068 def btmon_stop(self): 3069 """Stop btmon monitoring.""" 3070 self.btmon.stop() 3071 3072 def btmon_get(self, search_str, start_str): 3073 """Get btmon output contents. 3074 3075 @param search_str: only lines with search_str would be kept. 3076 @param start_str: all lines before the occurrence of start_str would be 3077 filtered. 3078 3079 @returns: the recorded btmon output. 3080 3081 """ 3082 return self.btmon.get_contents(search_str=search_str, 3083 start_str=start_str) 3084 3085 def btmon_find(self, pattern_str): 3086 """Find if a pattern string exists in btmon output. 3087 3088 @param pattern_str: the pattern string to find. 3089 3090 @returns: True on success. False otherwise. 3091 3092 """ 3093 return self.btmon.find(pattern_str) 3094 3095 def dbus_method_with_handlers(self, dbus_method, reply_handler, 3096 error_handler, *args, **kwargs): 3097 """Run an async dbus method. 3098 3099 @param dbus_method: the dbus async method to invoke. 3100 @param reply_handler: the reply handler for the dbus method. 3101 @param error_handler: the error handler for the dbus method. 3102 @param *args: additional arguments for the dbus method. 3103 @param **kwargs: additional keyword arguments for the dbus method. 3104 3105 @returns: an empty string '' on success; 3106 None if there is no _advertising interface manager; and 3107 an error string if the dbus method fails or exception occurs 3108 3109 """ 3110 3111 def successful_cb(): 3112 """Called when the dbus_method completed successfully.""" 3113 reply_handler() 3114 self.dbus_cb_msg = '' 3115 3116 def error_cb(error): 3117 """Called when the dbus_method failed.""" 3118 error_handler(error) 3119 self.dbus_cb_msg = str(error) 3120 3121 # Successful dbus calls will have a non-throwing result and error 3122 # results will throw GLib.Error. 3123 try: 3124 _ = dbus_method(*args, **kwargs) 3125 successful_cb() 3126 except GLib.Error as e: 3127 error_cb(e) 3128 except Exception as e: 3129 logging.error('Exception %s in dbus_method_with_handlers ', e) 3130 return str(e) 3131 3132 return self.dbus_cb_msg 3133 3134 def advmon_check_manager_interface_exist(self): 3135 """Check if AdvertisementMonitorManager1 interface is available. 3136 3137 @returns: True if Manager interface is available, False otherwise. 3138 3139 """ 3140 objects = self._objmgr_proxy.GetManagedObjects() 3141 for _, ifaces in six.iteritems(objects): 3142 if self.BLUEZ_ADV_MONITOR_MANAGER_IFACE in ifaces: 3143 return True 3144 3145 return False 3146 3147 def advmon_read_supported_types(self): 3148 """Read the Advertisement Monitor supported monitor types. 3149 3150 Reads the value of 'SupportedMonitorTypes' property of the 3151 AdvertisementMonitorManager1 interface on the adapter. 3152 3153 @returns: the list of the supported monitor types. 3154 3155 """ 3156 return unpack_if_variant( 3157 self._property_proxy.Get(self.BLUEZ_ADV_MONITOR_MANAGER_IFACE, 3158 'SupportedMonitorTypes')) 3159 3160 def advmon_read_supported_features(self): 3161 """Read the Advertisement Monitor supported features. 3162 3163 Reads the value of 'SupportedFeatures' property of the 3164 AdvertisementMonitorManager1 interface on the adapter. 3165 3166 @returns: the list of the supported features. 3167 3168 """ 3169 return unpack_if_variant( 3170 self._property_proxy.Get(self.BLUEZ_ADV_MONITOR_MANAGER_IFACE, 3171 'SupportedFeatures')) 3172 3173 def advmon_create_app(self): 3174 """Create an advertisement monitor app. 3175 3176 @returns: app id, once the app is created. 3177 3178 """ 3179 return self.advmon_appmgr.create_app() 3180 3181 def advmon_exit_app(self, app_id): 3182 """Exit an advertisement monitor app. 3183 3184 @param app_id: the app id. 3185 3186 @returns: True on success, False otherwise. 3187 3188 """ 3189 return self.advmon_appmgr.exit_app(app_id) 3190 3191 def advmon_kill_app(self, app_id): 3192 """Kill an advertisement monitor app by sending SIGKILL. 3193 3194 @param app_id: the app id. 3195 3196 @returns: True on success, False otherwise. 3197 3198 """ 3199 return self.advmon_appmgr.kill_app(app_id) 3200 3201 def advmon_register_app(self, app_id): 3202 """Register an advertisement monitor app. 3203 3204 @param app_id: the app id. 3205 3206 @returns: True on success, False otherwise. 3207 3208 """ 3209 return self.advmon_appmgr.register_app(app_id) 3210 3211 def advmon_unregister_app(self, app_id): 3212 """Unregister an advertisement monitor app. 3213 3214 @param app_id: the app id. 3215 3216 @returns: True on success, False otherwise. 3217 3218 """ 3219 return self.advmon_appmgr.unregister_app(app_id) 3220 3221 def advmon_add_monitor(self, app_id, monitor_data): 3222 """Create an Advertisement Monitor object. 3223 3224 @param app_id: the app id. 3225 @param monitor_data: the list containing monitor type, RSSI filter 3226 values and patterns. 3227 3228 @returns: monitor id, once the monitor is created, None otherwise. 3229 3230 """ 3231 return self.advmon_appmgr.add_monitor(app_id, monitor_data) 3232 3233 def advmon_remove_monitor(self, app_id, monitor_id): 3234 """Remove the Advertisement Monitor object. 3235 3236 @param app_id: the app id. 3237 @param monitor_id: the monitor id. 3238 3239 @returns: True on success, False otherwise. 3240 3241 """ 3242 return self.advmon_appmgr.remove_monitor(app_id, monitor_id) 3243 3244 def advmon_get_event_count(self, app_id, monitor_id, event): 3245 """Read the count of a particular event on the given monitor. 3246 3247 @param app_id: the app id. 3248 @param monitor_id: the monitor id. 3249 @param event: name of the specific event or 'All' for all events. 3250 3251 @returns: count of the specific event or dict of counts of all events. 3252 3253 """ 3254 return self.advmon_appmgr.get_event_count(app_id, monitor_id, event) 3255 3256 def advmon_reset_event_count(self, app_id, monitor_id, event): 3257 """Reset the count of a particular event on the given monitor. 3258 3259 @param app_id: the app id. 3260 @param monitor_id: the monitor id. 3261 @param event: name of the specific event or 'All' for all events. 3262 3263 @returns: True on success, False otherwise. 3264 3265 """ 3266 return self.advmon_appmgr.reset_event_count(app_id, monitor_id, event) 3267 3268 def advmon_set_target_devices(self, app_id, monitor_id, devices): 3269 """Set the target devices to the given monitor. 3270 3271 DeviceFound and DeviceLost will only be counted if it is triggered by a 3272 target device. 3273 3274 @param app_id: the app id. 3275 @param monitor_id: the monitor id. 3276 @param devices: a list of devices in MAC address 3277 3278 @returns: True on success, False otherwise. 3279 3280 """ 3281 paths = [] 3282 for addr in devices: 3283 paths.append('{}/dev_{}'.format(self._adapter_path, 3284 addr.replace(':', '_'))) 3285 3286 return self.advmon_appmgr.set_target_devices(app_id, monitor_id, paths) 3287 3288 def advmon_interleave_scan_logger_start(self): 3289 """ Start interleave logger recording 3290 """ 3291 self.advmon_interleave_logger.StartRecording() 3292 3293 def advmon_interleave_scan_logger_stop(self): 3294 """ Stop interleave logger recording 3295 3296 @returns: True if logs were successfully collected, 3297 False otherwise. 3298 3299 """ 3300 return self.advmon_interleave_logger.StopRecording() 3301 3302 def advmon_interleave_scan_logger_get_records(self): 3303 """ Get records in previous log collections 3304 3305 @returns: a list of records, where each item is a record of 3306 interleave |state| and the |time| the state starts. 3307 |state| could be {'no filter', 'allowlist'} 3308 |time| is system time in sec 3309 3310 """ 3311 return self.advmon_interleave_logger.records 3312 3313 def advmon_interleave_scan_logger_get_cancel_events(self): 3314 """ Get cancel events in previous log collections 3315 3316 @returns: a list of cancel |time| when a interleave cancel event log 3317 was found. 3318 |time| is system time in sec 3319 3320 """ 3321 return self.advmon_interleave_logger.cancel_events 3322 3323 def register_advertisement(self, advertisement_data): 3324 """Register an advertisement. 3325 3326 Note that rpc supports only conformable types. Hence, a 3327 dict about the advertisement is passed as a parameter such 3328 that the advertisement object could be constructed on the host. 3329 3330 @param advertisement_data: a dict of the advertisement to register. 3331 3332 @returns: True on success. False otherwise. 3333 3334 """ 3335 adv = advertisement.Advertisement(self.bus, advertisement_data) 3336 self.advertisements.append(adv) 3337 return self.dbus_method_with_handlers( 3338 self._advertising.RegisterAdvertisement, 3339 # reply handler 3340 lambda: logging.info('register_advertisement: succeeded.'), 3341 # error handler 3342 lambda error: logging.error( 3343 'register_advertisement: failed: %s', str(error)), 3344 # other arguments 3345 adv.get_path(), 3346 {}) 3347 3348 def unregister_advertisement(self, advertisement_data): 3349 """Unregister an advertisement. 3350 3351 Note that to unregister an advertisement, it is required to use 3352 the same self._advertising interface manager. This is because 3353 bluez only allows the same sender to invoke UnregisterAdvertisement 3354 method. Hence, watch out that the bluetoothd is not restarted or 3355 self.start_bluetoothd() is not executed between the time span that 3356 an advertisement is registered and unregistered. 3357 3358 @param advertisement_data: a dict of the advertisements to unregister. 3359 3360 @returns: True on success. False otherwise. 3361 3362 """ 3363 path = advertisement_data.get('Path') 3364 for index, adv in enumerate(self.advertisements): 3365 if adv.get_path() == path: 3366 break 3367 else: 3368 logging.error('Fail to find the advertisement under the path: %s', 3369 path) 3370 return False 3371 3372 result = self.dbus_method_with_handlers( 3373 self._advertising.UnregisterAdvertisement, 3374 # reply handler 3375 lambda: logging.info('unregister_advertisement: succeeded.'), 3376 # error handler 3377 lambda error: logging.error( 3378 'unregister_advertisement: failed: %s', str(error)), 3379 # other arguments 3380 adv.get_path()) 3381 3382 # Call unregister() so that the same path could be reused. 3383 adv.unregister() 3384 del self.advertisements[index] 3385 3386 return result 3387 3388 def set_advertising_intervals(self, min_adv_interval_ms, 3389 max_adv_interval_ms): 3390 """Set advertising intervals. 3391 3392 @param min_adv_interval_ms: the min advertising interval in ms. 3393 @param max_adv_interval_ms: the max advertising interval in ms. 3394 3395 @returns: True on success. False otherwise. 3396 3397 """ 3398 return self.dbus_method_with_handlers( 3399 self._advertising.SetAdvertisingIntervals, 3400 # reply handler 3401 lambda: logging.info('set_advertising_intervals: succeeded.'), 3402 # error handler 3403 lambda error: logging.error( 3404 'set_advertising_intervals: failed: %s', str(error)), 3405 # other arguments 3406 min_adv_interval_ms, 3407 max_adv_interval_ms) 3408 3409 def get_advertisement_property(self, adv_path, prop_name): 3410 """Grab property of an advertisement registered on the DUT 3411 3412 The service on the DUT registers a dbus object and holds it. During the 3413 test, some properties on the object may change, so this allows the test 3414 access to the properties at run-time. 3415 3416 @param adv_path: string path of the dbus object 3417 @param prop_name: string name of the property required 3418 3419 @returns: the value of the property in standard (non-dbus) type if the 3420 property exists, else None 3421 """ 3422 for adv in self.advertisements: 3423 if str(adv.get_path()) == adv_path: 3424 adv_props = adv.GetAll('org.bluez.LEAdvertisement1') 3425 return unpack_if_variant(adv_props.get(prop_name, None)) 3426 3427 return None 3428 3429 def get_advertising_manager_property(self, prop_name): 3430 """Grab property of the bluez advertising manager 3431 3432 This allows us to understand the DUT's advertising capabilities, for 3433 instance the maximum number of advertising instances supported, so that 3434 we can test these capabilities. 3435 3436 @param adv_path: string path of the dbus object 3437 @param prop_name: string name of the property required 3438 3439 @returns: the value of the property in standard (non-dbus) type if the 3440 property exists, else None 3441 """ 3442 3443 return unpack_if_variant( 3444 self._property_proxy.Get( 3445 self.BLUEZ_LE_ADVERTISING_MANAGER_IFACE, prop_name)) 3446 3447 def reset_advertising(self): 3448 """Reset advertising. 3449 3450 This includes un-registering all advertisements, reset advertising 3451 intervals, and disable advertising. 3452 3453 @returns: True on success. False otherwise. 3454 3455 """ 3456 # It is required to execute unregister() to unregister the 3457 # object-path handler of each advertisement. In this way, we could 3458 # register an advertisement with the same path repeatedly. 3459 for adv in self.advertisements: 3460 adv.unregister() 3461 del self.advertisements[:] 3462 3463 return self.dbus_method_with_handlers( 3464 self._advertising.ResetAdvertising, 3465 # reply handler 3466 lambda: logging.info('reset_advertising: succeeded.'), 3467 # error handler 3468 lambda error: logging.error('reset_advertising: failed: %s', 3469 str(error))) 3470 3471 def get_gatt_attributes_map(self, address): 3472 """Return a JSON formatted string of the GATT attributes of a device, 3473 keyed by UUID 3474 @param address: a string of the MAC address of the device 3475 3476 @return: JSON formated string, stored the nested structure of the 3477 attributes. Each attribute has 'path' and 3478 ['characteristics' | 'descriptors'], which store their object path and 3479 children respectively. 3480 3481 """ 3482 attribute_map = dict() 3483 3484 device_object_path = self._get_device_path(address) 3485 objects = self._objmgr_proxy.GetManagedObjects() 3486 service_map = self._get_service_map(device_object_path, objects) 3487 3488 servs = dict() 3489 attribute_map['services'] = servs 3490 3491 for uuid, path in service_map.items(): 3492 3493 servs[uuid] = dict() 3494 serv = servs[uuid] 3495 3496 serv['path'] = path 3497 serv['characteristics'] = dict() 3498 chrcs = serv['characteristics'] 3499 3500 chrcs_map = self._get_characteristic_map(path, objects) 3501 for uuid, path in chrcs_map.items(): 3502 chrcs[uuid] = dict() 3503 chrc = chrcs[uuid] 3504 3505 chrc['path'] = path 3506 chrc['descriptors'] = dict() 3507 descs = chrc['descriptors'] 3508 3509 descs_map = self._get_descriptor_map(path, objects) 3510 3511 for uuid, path in descs_map.items(): 3512 descs[uuid] = dict() 3513 desc = descs[uuid] 3514 3515 desc['path'] = path 3516 3517 return json.dumps(attribute_map) 3518 3519 def _get_gatt_interface(self, uuid, object_path, interface): 3520 """Get dbus interface by uuid 3521 @param uuid: a string of uuid 3522 @param object_path: a string of the object path of the service 3523 3524 @return: a dbus interface 3525 """ 3526 3527 return self.bus.get(self.BLUEZ_SERVICE_NAME, object_path)[interface] 3528 3529 def get_gatt_service_property(self, object_path, property_name): 3530 """Get property from a service attribute 3531 @param object_path: a string of the object path of the service 3532 @param property_name: a string of a property, ex: 'Value', 'UUID' 3533 3534 @return: the property if success, 3535 none otherwise 3536 3537 """ 3538 return self.get_gatt_attribute_property(object_path, 3539 self.BLUEZ_GATT_SERV_IFACE, 3540 property_name) 3541 3542 def get_gatt_characteristic_property(self, object_path, property_name): 3543 """Get property from a characteristic attribute 3544 @param object_path: a string of the object path of the characteristic 3545 @param property_name: a string of a property, ex: 'Value', 'UUID' 3546 3547 @return: the property if success, 3548 none otherwise 3549 3550 """ 3551 return self.get_gatt_attribute_property(object_path, 3552 self.BLUEZ_GATT_CHAR_IFACE, 3553 property_name) 3554 3555 def get_gatt_descriptor_property(self, object_path, property_name): 3556 """Get property from descriptor attribute 3557 @param object_path: a string of the object path of the descriptor 3558 @param property_name: a string of a property, ex: 'Value', 'UUID' 3559 3560 @return: the property if success, 3561 none otherwise 3562 3563 """ 3564 return self.get_gatt_attribute_property(object_path, 3565 self.BLUEZ_GATT_DESC_IFACE, 3566 property_name) 3567 3568 @dbus_safe(None) 3569 def get_gatt_attribute_property(self, object_path, interface, 3570 property_name): 3571 """Get property from attribute 3572 @param object_path: a string of the bject path 3573 @param property_name: a string of a property, ex: 'Value', 'UUID' 3574 3575 @return: the property if success, 3576 none otherwise 3577 3578 """ 3579 gatt_object = self.bus.get(self.BLUEZ_SERVICE_NAME, object_path) 3580 prop = self._get_dbus_object_property(gatt_object, interface, 3581 property_name) 3582 logging.info(prop) 3583 if isinstance(prop, bytearray): 3584 return _dbus_byte_array_to_b64_string(prop) 3585 if isinstance(prop, bool): 3586 return bool(prop) 3587 if isinstance(prop, list): 3588 return list(map(str, prop)) 3589 return prop 3590 3591 @dbus_safe(None) 3592 def gatt_characteristic_read_value(self, uuid, object_path): 3593 """Perform method ReadValue on a characteristic attribute 3594 @param uuid: a string of uuid 3595 @param object_path: a string of the object path of the characteristic 3596 3597 @return: base64 string of dbus bytearray 3598 """ 3599 3600 dbus_interface = self._get_gatt_interface(uuid, object_path, 3601 self.BLUEZ_GATT_CHAR_IFACE) 3602 value = dbus_interface.ReadValue({}) 3603 return _dbus_byte_array_to_b64_string(value) 3604 3605 @dbus_safe(None) 3606 def gatt_descriptor_read_value(self, uuid, object_path): 3607 """Perform method ReadValue on a descriptor attribute 3608 @param uuid: a string of uuid 3609 @param object_path: a string of the object path of the descriptor 3610 3611 @return: base64 string of dbus bytearray 3612 """ 3613 3614 dbus_interface = self._get_gatt_interface(uuid, object_path, 3615 self.BLUEZ_GATT_DESC_IFACE) 3616 value = dbus_interface.ReadValue({}) 3617 return _dbus_byte_array_to_b64_string(value) 3618 3619 @dbus_safe(False) 3620 def _get_attribute_map(self, object_path, dbus_interface, objects): 3621 """Gets a map of object paths under an object path. 3622 3623 Walks the object tree, and returns a map of UUIDs to object paths for 3624 all resolved gatt object. 3625 3626 @param object_path: The object path of the attribute to retrieve 3627 gatt UUIDs and paths from. 3628 @param objects: The managed objects. 3629 3630 @returns: A dictionary of object paths, keyed by UUID. 3631 3632 """ 3633 attr_map = {} 3634 3635 if object_path: 3636 for path, ifaces in six.iteritems(objects): 3637 if (dbus_interface in ifaces and path.startswith(object_path)): 3638 uuid = ifaces[dbus_interface]['UUID'].lower() 3639 attr_map[uuid] = path 3640 3641 else: 3642 logging.warning('object_path %s is not valid', object_path) 3643 3644 return attr_map 3645 3646 def _get_service_map(self, device_path, objects): 3647 """Gets a map of service paths for a device. 3648 3649 @param device_path: the object path of the device. 3650 @param objects: The managed objects. 3651 """ 3652 return self._get_attribute_map(device_path, self.BLUEZ_GATT_SERV_IFACE, 3653 objects) 3654 3655 def _get_characteristic_map(self, serv_path, objects): 3656 """Gets a map of characteristic paths for a service. 3657 3658 @param serv_path: the object path of the service. 3659 @param objects: The managed objects. 3660 """ 3661 return self._get_attribute_map(serv_path, self.BLUEZ_GATT_CHAR_IFACE, 3662 objects) 3663 3664 def _get_descriptor_map(self, chrc_path, objects): 3665 """Gets a map of descriptor paths for a characteristic. 3666 3667 @param chrc_path: the object path of the characteristic. 3668 @param objects: The managed objects. 3669 """ 3670 return self._get_attribute_map(chrc_path, self.BLUEZ_GATT_DESC_IFACE, 3671 objects) 3672 3673 @dbus_safe(None) 3674 def _get_dbus_object_property(self, dbus_object, dbus_interface, 3675 dbus_property): 3676 """Get the property in an object. 3677 3678 @param dbus_object: a dbus object 3679 @param dbus_interface: a dbus interface where the property exists 3680 @param dbus_property: a dbus property of the dbus object, as a string 3681 3682 @return: dbus type object if it success, e.g. dbus.Boolean, dbus.String, 3683 none otherwise 3684 3685 """ 3686 return dbus_object[self.DBUS_PROP_IFACE].Get(dbus_interface, 3687 dbus_property) 3688 3689 @dbus_safe(False) 3690 def get_characteristic_map(self, address): 3691 """Gets a map of characteristic paths for a device. 3692 3693 Walks the object tree, and returns a map of uuids to object paths for 3694 all resolved gatt characteristics. 3695 3696 @param address: The MAC address of the device to retrieve 3697 gatt characteristic uuids and paths from. 3698 3699 @returns: A dictionary of characteristic paths, keyed by uuid. 3700 3701 """ 3702 device_path = self._get_device_path(address) 3703 char_map = {} 3704 3705 if device_path: 3706 objects = self._objmgr_proxy.GetManagedObjects() 3707 3708 for path, ifaces in six.iteritems(objects): 3709 if (self.BLUEZ_GATT_CHAR_IFACE in ifaces 3710 and path.startswith(device_path)): 3711 uuid = ifaces[self.BLUEZ_GATT_CHAR_IFACE]['UUID'].lower() 3712 char_map[uuid] = path 3713 else: 3714 logging.warning('Device %s not in object tree.', address) 3715 3716 return char_map 3717 3718 @dbus_safe(None) 3719 def _get_char_object(self, uuid, address): 3720 """Gets a characteristic object. 3721 3722 Gets a characteristic object for a given UUID and address. 3723 3724 @param uuid: The UUID of the characteristic, as a string. 3725 @param address: The MAC address of the remote device. 3726 3727 @returns: A dbus interface for the characteristic if the uuid/address 3728 is in the object tree. 3729 None if the address/uuid is not found in the object tree. 3730 3731 """ 3732 path = self.get_characteristic_map(address).get(uuid) 3733 if not path: 3734 logging.error("path not found: %s %s", uuid, address) 3735 return None 3736 return self.bus.get(self.BLUEZ_SERVICE_NAME, 3737 path)[self.BLUEZ_GATT_CHAR_IFACE] 3738 3739 @dbus_safe(None) 3740 def read_characteristic(self, uuid, address): 3741 """Reads the value of a gatt characteristic. 3742 3743 Reads the current value of a gatt characteristic. Base64 endcoding is 3744 used for compatibility with the XML RPC interface. 3745 3746 @param uuid: The uuid of the characteristic to read, as a string. 3747 @param address: The MAC address of the remote device. 3748 3749 @returns: A b64 encoded version of a byte array containing the value 3750 if the uuid/address is in the object tree. 3751 None if the uuid/address was not found in the object tree, or 3752 if a DBus exception was raised by the read operation. 3753 3754 """ 3755 char_obj = self._get_char_object(uuid, address) 3756 if char_obj is None: 3757 return None 3758 value = char_obj.ReadValue({}) 3759 return _dbus_byte_array_to_b64_string(value) 3760 3761 @dbus_safe(None) 3762 def write_characteristic(self, uuid, address, value): 3763 """Performs a write operation on a gatt characteristic. 3764 3765 Writes to a GATT characteristic on a remote device. Base64 endcoding is 3766 used for compatibility with the XML RPC interface. 3767 3768 @param uuid: The uuid of the characteristic to write to, as a string. 3769 @param address: The MAC address of the remote device, as a string. 3770 @param value: A byte array containing the data to write. 3771 3772 @returns: True if the write operation does not raise an exception. 3773 None if the uuid/address was not found in the object tree, or 3774 if a DBus exception was raised by the write operation. 3775 3776 """ 3777 char_obj = self._get_char_object(uuid, address) 3778 if char_obj is None: 3779 return None 3780 dbus_value = _b64_string_to_dbus_byte_array(value) 3781 char_obj.WriteValue(dbus_value, {}) 3782 return True 3783 3784 @dbus_safe(None) 3785 def exchange_messages(self, tx_object_path, rx_object_path, value): 3786 """Performs a write operation on a gatt characteristic and wait for 3787 the response on another characteristic. 3788 3789 @param tx_object_path: the object path of the characteristic to write. 3790 @param rx_object_path: the object path of the characteristic to read. 3791 @param value: A byte array containing the data to write. 3792 3793 @returns: The value of the characteristic to read from. 3794 None if the uuid/address was not found in the object tree, or 3795 if a DBus exception was raised by the write operation. 3796 3797 """ 3798 tx_obj = self._get_gatt_characteristic_object(tx_object_path) 3799 3800 if tx_obj is None: 3801 return None 3802 3803 self._chrc_property = ''.encode('utf-8') 3804 3805 value = str(value) 3806 proxy = self.bus.get(self.BLUEZ_SERVICE_NAME, rx_object_path)[self.DBUS_PROP_IFACE] 3807 self._signal_watch = proxy.PropertiesChanged.connect(self._property_changed) 3808 3809 # Start timeout source 3810 self._timeout_start = time.time() 3811 self._timeout_early = False 3812 self._timeout_id = GObject.timeout_add( 3813 self.PROPERTY_UPDATE_CHECK_MILLI_SECS, 3814 self._property_wait_timeout) 3815 3816 write_value = _b64_string_to_dbus_byte_array(value) 3817 tx_obj.WriteValue(write_value, {}) 3818 3819 self._dbus_mainloop.run() 3820 3821 return _dbus_byte_array_to_b64_string(self._chrc_property) 3822 3823 def _property_changed(self, *args, **kwargs): 3824 """Handler for properties changed signal.""" 3825 # We don't cancel the timeout here due to a problem with the GLib 3826 # mainloop. See |_property_wait_timeout| for a full explanation. 3827 self._timeout_early = True 3828 self._signal_watch.disconnect() 3829 changed_prop = args 3830 3831 logging.info(changed_prop) 3832 prop_dict = changed_prop[1] 3833 self._chrc_property = prop_dict['Value'] 3834 if self._dbus_mainloop.is_running(): 3835 self._dbus_mainloop.quit() 3836 3837 def _property_wait_timeout(self): 3838 """Timeout handler when waiting for properties update signal.""" 3839 # Sometimes, GLib.Mainloop doesn't exit after |mainloop.quit()| is 3840 # called. This seems to occur only if a timeout source was active and 3841 # was removed before it had a chance to run. To mitigate this, we don't 3842 # cancel the timeout but mark an early completion instead. 3843 # See b/222364364#comment3 for more information. 3844 if not self._timeout_early and int( 3845 (time.time() - self._timeout_start) * 3846 1000) <= self.PROPERTY_UPDATE_TIMEOUT_MILLI_SECS: 3847 # Returning True means this will be called again. 3848 return True 3849 3850 self._signal_watch.disconnect() 3851 if self._dbus_mainloop.is_running(): 3852 logging.warning("quit main loop due to timeout") 3853 self._dbus_mainloop.quit() 3854 # Return false so that this method will not be called again. 3855 return False 3856 3857 @dbus_safe(False) 3858 def _get_gatt_characteristic_object(self, object_path): 3859 return self.bus.get(self.BLUEZ_SERVICE_NAME, 3860 object_path)[self.BLUEZ_GATT_CHAR_IFACE] 3861 3862 @dbus_safe(False) 3863 def start_notify(self, object_path, cccd_value): 3864 """Starts the notification session on the gatt characteristic. 3865 3866 @param object_path: the object path of the characteristic. 3867 @param cccd_value: Possible CCCD values include 3868 0x00 - inferred from the remote characteristic's properties 3869 0x01 - notification 3870 0x02 - indication 3871 3872 @returns: True if the operation succeeds. 3873 False if the characteristic is not found, or 3874 if a DBus exception was raised by the operation. 3875 3876 """ 3877 char_obj = self._get_gatt_characteristic_object(object_path) 3878 if char_obj is None: 3879 logging.error("characteristic not found: %s %s", object_path) 3880 return False 3881 3882 try: 3883 char_obj.StartNotify(cccd_value) 3884 return True 3885 except Exception as e: 3886 logging.error('start_notify: %s', e) 3887 except: 3888 logging.error('start_notify: unexpected error') 3889 return False 3890 3891 @dbus_safe(False) 3892 def stop_notify(self, object_path): 3893 """Stops the notification session on the gatt characteristic. 3894 3895 @param object_path: the object path of the characteristic. 3896 3897 @returns: True if the operation succeeds. 3898 False if the characteristic is not found, or 3899 if a DBus exception was raised by the operation. 3900 3901 """ 3902 char_obj = self._get_gatt_characteristic_object(object_path) 3903 if char_obj is None: 3904 logging.error("characteristic not found: %s %s", object_path) 3905 return False 3906 3907 try: 3908 char_obj.StopNotify() 3909 return True 3910 except Exception as e: 3911 logging.error('stop_notify: %s', e) 3912 except: 3913 logging.error('stop_notify: unexpected error') 3914 return False 3915 3916 @dbus_safe(False) 3917 def is_notifying(self, object_path): 3918 """Is the GATT characteristic in a notifying session? 3919 3920 @param object_path: the object path of the characteristic. 3921 3922 @return True if it is in a notification session. False otherwise. 3923 3924 """ 3925 3926 return self.get_gatt_characteristic_property(object_path, 'Notifying') 3927 3928 @dbus_safe(False) 3929 def is_characteristic_path_resolved(self, uuid, address): 3930 """Checks whether a characteristic is in the object tree. 3931 3932 Checks whether a characteristic is curently found in the object tree. 3933 3934 @param uuid: The uuid of the characteristic to search for. 3935 @param address: The MAC address of the device on which to search for 3936 the characteristic. 3937 3938 @returns: True if the characteristic is found. 3939 False if the characteristic path is not found. 3940 3941 """ 3942 return bool(self.get_characteristic_map(address).get(uuid)) 3943 3944 @dbus_safe(False) 3945 def get_connection_info(self, address): 3946 """Get device connection info. 3947 3948 @param address: The MAC address of the device. 3949 3950 @returns: On success, a JSON-encoded tuple of: 3951 ( RSSI, transmit_power, max_transmit_power ) 3952 None otherwise. 3953 3954 """ 3955 plugin_device = self._get_plugin_device_interface(address) 3956 if plugin_device is None: 3957 return None 3958 3959 try: 3960 connection_info = plugin_device.GetConnInfo() 3961 return json.dumps(connection_info) 3962 except Exception as e: 3963 logging.error('get_connection_info: %s', e) 3964 except: 3965 logging.error('get_connection_info: unexpected error') 3966 return None 3967 3968 def has_connection_info(self, address): 3969 """Checks whether the address has connection info. 3970 3971 @param address: The MAC address of the device. 3972 @returns True if connection info can be found. 3973 """ 3974 return self.get_connection_info(address) is not None 3975 3976 @dbus_safe(False) 3977 def set_le_connection_parameters(self, address, parameters): 3978 """Set the LE connection parameters. 3979 3980 @param address: The MAC address of the device. 3981 @param parameters: The LE connection parameters to set. 3982 3983 @return: True on success. False otherwise. 3984 3985 """ 3986 plugin_device = self._get_plugin_device_interface(address) 3987 if plugin_device is None: 3988 return False 3989 3990 return not self.dbus_method_with_handlers( 3991 plugin_device.SetLEConnectionParameters, 3992 # reply handler 3993 lambda: logging.info('set_le_connection_parameters: succeeded.' 3994 ), 3995 # error handler 3996 lambda error: logging. 3997 error('set_le_connection_parameters: failed: %s', str(error)), 3998 # other arguments 3999 parameters) 4000 4001 @dbus_safe(False) 4002 def _get_plugin_device_interface(self, address): 4003 """Get the BlueZ Chromium device plugin interface. 4004 4005 This interface can be used to issue dbus requests such as 4006 GetConnInfo and SetLEConnectionParameters. 4007 4008 @param address: The MAC address of the device. 4009 4010 @return: On success, the BlueZ Chromium device plugin interface 4011 None otherwise. 4012 4013 """ 4014 path = self._get_device_path(address) 4015 if path is None: 4016 return None 4017 4018 return self.bus.get(self.BLUEZ_SERVICE_NAME, 4019 path)[self.BLUEZ_PLUGIN_DEVICE_IFACE] 4020 4021 @dbus_safe(False) 4022 def policy_get_service_allow_list(self): 4023 """Get the service allow list for enterprise policy. 4024 4025 @returns: array of strings representing the allowed service UUIDs. 4026 """ 4027 uuids = unpack_if_variant( 4028 self._property_proxy.Get(self.BLUEZ_ADMIN_POLICY_STATUS_IFACE, 4029 'ServiceAllowList')) 4030 logging.debug('ServiceAllowList: %s', uuids) 4031 return uuids 4032 4033 @dbus_safe(False, return_error=True) 4034 def policy_set_service_allow_list(self, uuids): 4035 """Set the service allow list for enterprise policy. 4036 4037 @param uuids: a string representing the uuids; e.g., "1234,0xabcd" or "" 4038 4039 @returns: (True, '') on success, (False, '<error>') on failure. 4040 """ 4041 dbus_array = [] 4042 if bool(uuids.strip()): 4043 for uuid in uuids.split(','): 4044 dbus_array.append(uuid.strip()) 4045 4046 logging.debug('policy_set_service_allow_list: %s', dbus_array) 4047 self._adapter[self.BLUEZ_ADMIN_POLICY_SET_IFACE].SetServiceAllowList( 4048 dbus_array) 4049 return (True, '') 4050 4051 @dbus_safe(False, return_error=True) 4052 def policy_get_device_affected(self, device_address): 4053 """Check if the device is affected by enterprise policy. 4054 4055 @param device_address: address of the device 4056 e.g. '6C:29:95:1A:D4:6F' 4057 4058 @returns: True if the device is affected by the enterprise policy. 4059 False if not. None if the device is not found. 4060 """ 4061 device = self._find_device(device_address) 4062 if not device: 4063 logging.debug('Failed to find device %s', device_address) 4064 return None 4065 4066 affected = unpack_if_variant(device[self.DBUS_PROP_IFACE].Get( 4067 self.BLUEZ_ADMIN_POLICY_STATUS_IFACE, 'AffectedByPolicy')) 4068 logging.debug('policy_get_device_affected(%s): %s', device_address, 4069 affected) 4070 return affected 4071 4072 def cleanup(self): 4073 """Cleanup before exiting the client xmlrpc process.""" 4074 4075 self.advmon_appmgr.destroy() 4076 4077 def get_sysconfig(self): 4078 """Helper function to get default controller parameters 4079 4080 @returns: dict of type to values, both are in string form, 4081 None if the operation read-sysconfig failed. 4082 """ 4083 tlv_re = re.compile('Type: (0x[0-9A-Fa-f]{4})\s+' 4084 'Length: ([0-9A-Fa-f]{2})\s+' 4085 'Value: ([0-9A-Fa-f]+)') 4086 4087 cmd = 'btmgmt read-sysconfig' 4088 # btmgmt needs stdin, otherwise it won't output anything. 4089 # Please refer to 4090 # third_party/bluez/current/src/shared/shell.c:bt_shell_printf 4091 # for more information 4092 output = subprocess.check_output(cmd.split(), 4093 stdin=subprocess.PIPE, 4094 encoding='UTF-8') 4095 4096 if output is None: 4097 logging.warning('Unable to retrieve output of %s', cmd) 4098 return None 4099 4100 sysconfig = dict() 4101 4102 for line in output.splitlines(): 4103 try: 4104 m = tlv_re.match(line) 4105 t, l, v = m.groups() 4106 sysconfig[int(t, 16)] = v 4107 except Exception as e: 4108 logging.warning('Unexpected error %s at "%s"', str(e), line) 4109 4110 logging.debug("default controller parameters: %s", sysconfig) 4111 return sysconfig 4112 4113 def _le_hex_to_int(self, le_hex): 4114 """Convert a little-endian hex-string to an unsigned integer. 4115 For example, _le_hex_to_int('0x0102') returns the same value as 4116 int('0201', 16) 4117 """ 4118 if le_hex is None: 4119 return None 4120 4121 ba = bytearray.fromhex(le_hex) 4122 ba.reverse() 4123 return int(binascii.hexlify(ba), 16) 4124 4125 def get_advmon_interleave_durations(self): 4126 """Get durations of allowlist scan and no filter scan 4127 4128 @returns: a dict of {'allowlist': allowlist_duration, 4129 'no filter': no_filter_duration}, 4130 or None if something went wrong 4131 """ 4132 4133 sysconfig = self.get_sysconfig() 4134 4135 if sysconfig is None: 4136 return None 4137 4138 AllowlistScanDuration = self._le_hex_to_int(sysconfig.get( 4139 0x001d, None)) 4140 NoFilterScanDuration = self._le_hex_to_int(sysconfig.get(0x001e, None)) 4141 4142 return { 4143 'allowlist': AllowlistScanDuration, 4144 'no filter': NoFilterScanDuration 4145 } 4146 4147 4148class FlossFacadeLocal(BluetoothBaseFacadeLocal): 4149 """Exposes DUT methods called remotely during Bluetooth autotests for the 4150 Floss daemon. 4151 4152 All instance methods of this object without a preceding '_' are exposed via 4153 an XML-RPC server. This is not a stateless handler object, which means that 4154 if you store state inside the delegate, that state will remain around for 4155 future calls. 4156 """ 4157 4158 # Default to this adapter during init. We will initialize to the correct 4159 # default adapter after the manager client is initialized. 4160 DEFAULT_ADAPTER = 0 4161 4162 # How long we wait for the adapter to come up after we start it 4163 ADAPTER_DAEMON_TIMEOUT_SEC = 20 4164 4165 # Floss stops discovery after ~12s after starting. To improve discovery 4166 # chances in tests, we need to keep restarting discovery. This timeout 4167 # tracks how long an overall discovery session should be. 4168 DISCOVERY_TIMEOUT_SEC = 60 4169 4170 class DiscoveryObserver(BluetoothCallbacks): 4171 """ Discovery observer that restarts discovery until a timeout. 4172 4173 By default, the Floss stack stops discovery after ~12s. This can be an 4174 insufficient amount of time to discover a device, especially classic 4175 devices. To mimic Bluez, we have this observer restart discovery each 4176 time it is stopped up until a given timeout. 4177 """ 4178 4179 def __init__(self, adapter_client, timeout_secs): 4180 """Constructor. 4181 4182 @param adapter_client: Already initialized client instance. 4183 @param timeout_secs: How long to continue refreshing discovery. 4184 """ 4185 self.adapter_client = adapter_client 4186 self.deadline = datetime.now() + timedelta(seconds=timeout_secs) 4187 self.adapter_client.register_callback_observer( 4188 'DiscoveryObserver', self) 4189 self.discovering = None 4190 4191 def __del__(self): 4192 if self.adapter_client: 4193 self.cleanup() 4194 4195 def cleanup(self): 4196 """Clean up after this observer.""" 4197 self.adapter_client.unregister_callback_observer( 4198 'DiscoveryObserver', self) 4199 self.adapter_client = None 4200 4201 def on_discovering_changed(self, discovering): 4202 """Discovering has changed.""" 4203 4204 logging.info('Discovering changed to %s', discovering) 4205 4206 prev = self.discovering 4207 self.discovering = discovering 4208 4209 # No-op if this is the same notification sent multiple times 4210 if prev == discovering: 4211 pass 4212 # If discovering ended, check if the observer has timed out yet. If 4213 # not, re-start the discovery. 4214 if not discovering and datetime.now() < self.deadline: 4215 self.adapter_client.start_discovery( 4216 method_callback=self.start_discovery_rsp) 4217 4218 def start_discovery_rsp(self, err, result): 4219 """Result to |adapter_client.start_discovery|.""" 4220 # Log any errors that may have occurred 4221 if err: 4222 logging.error('Error on start_discovery: %s', err) 4223 elif result: 4224 logging.error('Error on start_discovery: Status=%s', result) 4225 4226 def __init__(self): 4227 # Init the BaseFacade first 4228 super(FlossFacadeLocal, self).__init__() 4229 4230 # Start mainloop thread in background. This will also initialize a few 4231 # other variables (self.bus, self.mainloop, self.event_context) that may 4232 # be necessary for proper operation. 4233 self.mainloop_quit = threading.Event() 4234 self.mainloop_ready = threading.Event() 4235 self.thread = threading.Thread( 4236 name=GLIB_THREAD_NAME, 4237 target=FlossFacadeLocal.mainloop_thread, 4238 args=(self, )) 4239 self.thread.start() 4240 4241 # Wait for mainloop to be ready 4242 if not self.mainloop_ready.wait(timeout=5): 4243 raise Exception('Unable to initialize GLib mainloop') 4244 4245 # Always initialize the manager client since there is a single instance. 4246 self.manager_client = FlossManagerClient(self.bus) 4247 self.adapter_client = FlossAdapterClient(self.bus, 4248 self.DEFAULT_ADAPTER) 4249 4250 self.is_clean = False 4251 4252 # Discovery needs to last longer than the default 12s. Keep an observer 4253 # that re-enables discovery up to some timeout. 4254 self.discovery_observer = None 4255 4256 # Cache some mock properties for testing. These may be properties that 4257 # are required in bluez but don't carry over well into Floss. 4258 self.mock_properties = {} 4259 4260 def __del__(self): 4261 if not self.is_clean: 4262 self.cleanup() 4263 4264 def cleanup(self): 4265 """Clean up the mainloop thread.""" 4266 self.mainloop_quit.set() 4267 self.mainloop.quit() 4268 self.is_clean = True 4269 4270 @staticmethod 4271 def mainloop_thread(self): 4272 """Runs GLib mainloop until we signal that we should quit.""" 4273 4274 # Set up mainloop. All subsequent buses and connections will use this 4275 # mainloop. We also use a separate main context to avoid multithreading 4276 # issues. 4277 #self.event_context = GLib.MainContext() 4278 #self.mainloop = GLib.MainLoop(context=self.event_context) 4279 GLib.threads_init() 4280 self.mainloop = GLib.MainLoop() 4281 4282 # Set up bus connection 4283 self.bus = pydbus.SystemBus() 4284 4285 # Set thread ready 4286 self.mainloop_ready.set() 4287 4288 while not self.mainloop_quit.is_set(): 4289 self.mainloop.run() 4290 4291 def get_floss_enabled(self): 4292 """Is Floss enabled right now? 4293 4294 Returns: 4295 True if Floss is enabled, False if Bluez is enabled. 4296 """ 4297 return self.manager_client.get_floss_enabled() 4298 4299 def set_floss_enabled(self, enabled): 4300 """Enable or disable Floss.""" 4301 self.manager_client.set_floss_enabled(enabled) 4302 4303 def start_bluetoothd(self): 4304 """Starts Floss. This includes enabling the adapter. 4305 4306 Returns: 4307 True if default adapter is enabled successfully. False otherwise. 4308 """ 4309 # Start manager and enable Floss 4310 if not self.configure_floss(enabled=True): 4311 return False 4312 4313 # Restarts the default adapter 4314 if not self.reset_on(): 4315 return False 4316 4317 # If we need to wait for any other interfaces, add below here: 4318 # ------------------------------------------------------------ 4319 4320 return True 4321 4322 def stop_bluetoothd(self): 4323 """Stops Floss. This includes disabling btmanagerd. 4324 4325 Returns: 4326 True if adapter daemon and manager daemon are both off. 4327 """ 4328 # First power off the adapter 4329 if not self.reset_off(): 4330 logging.warn('Failed to stop btadapterd') 4331 return False 4332 4333 if not UpstartClient.stop(self.MANAGER_JOB): 4334 logging.warn('Failed to stop btmanagerd') 4335 return False 4336 4337 def _daemon_stopped(): 4338 return all([ 4339 not self.manager_client.has_proxy(), 4340 not self.adapter_client.has_proxy(), 4341 ]) 4342 4343 try: 4344 utils.poll_for_condition(condition=_daemon_stopped, 4345 desc='Bluetooth daemons have stopped', 4346 timeout=self.DAEMON_TIMEOUT_SEC) 4347 daemon_stopped = True 4348 except Exception as e: 4349 logging.error('timeout: error stopping floss daemons: %s', e) 4350 daemon_stopped = False 4351 4352 return daemon_stopped 4353 4354 def restart_cras(self): 4355 """Restarts the cras daemon.""" 4356 self._restart_cras(enable_floss=True) 4357 4358 def is_bluetoothd_proxy_valid(self): 4359 """Checks whether the proxy objects for Floss are ok.""" 4360 return all([ 4361 self.manager_client.has_proxy(), 4362 self.adapter_client.has_proxy() 4363 ]) 4364 4365 def is_bluetoothd_running(self): 4366 """Checks whether Floss daemon is running.""" 4367 # This api doesn't enforce that the adapter is powered so we only check 4368 # that the manager proxy is up. 4369 return self.manager_client.has_proxy() 4370 4371 def has_adapter(self): 4372 """Checks whether an adapter exists.""" 4373 return len(self.manager_client.get_available_adapters()) > 0 4374 4375 def set_debug_log_levels(self, bluez_vb, kernel_vb): 4376 """Enables verbose logging.""" 4377 # TODO(abps) - This will be necessary for Floss but may not need to 4378 # touch the kernel. This needs to be implemented at the 4379 # daemon level still. 4380 return False 4381 4382 def start_discovery(self): 4383 """Start discovery of remote devices.""" 4384 if not self.adapter_client.has_proxy(): 4385 return (False, 'Adapter not found') 4386 4387 if self.discovery_observer: 4388 self.discovery_observer.cleanup() 4389 4390 self.discovery_observer = self.DiscoveryObserver( 4391 self.adapter_client, self.DISCOVERY_TIMEOUT_SEC) 4392 return (self.adapter_client.start_discovery(), '') 4393 4394 def stop_discovery(self): 4395 """Stop discovery of remote deviecs.""" 4396 if not self.adapter_client.has_proxy(): 4397 return (False, 'Adapter not found') 4398 4399 if self.discovery_observer: 4400 self.discovery_observer.cleanup() 4401 self.discovery_observer = None 4402 4403 return (self.adapter_client.stop_discovery(), '') 4404 4405 def is_discovering(self): 4406 """Check if adapter is discovering.""" 4407 return self.adapter_client.is_discovering() 4408 4409 def is_powered_on(self): 4410 """Gets whether the default adapter is enabled.""" 4411 default_adapter = self.manager_client.get_default_adapter() 4412 return self.manager_client.get_adapter_enabled(default_adapter) 4413 4414 def set_powered(self, powered): 4415 """Sets the default adapter's enabled state.""" 4416 default_adapter = self.manager_client.get_default_adapter() 4417 4418 if powered and not self.manager_client.has_default_adapter(): 4419 logging.warning('set_powered: Default adapter not available.') 4420 return False 4421 4422 if powered: 4423 self.manager_client.start(default_adapter) 4424 else: 4425 self.manager_client.stop(default_adapter) 4426 4427 return True 4428 4429 def reset_on(self): 4430 """Reset the default adapter into an ON state.""" 4431 return self.do_reset(True) 4432 4433 def reset_off(self): 4434 """Reset the default adapter into an OFF state.""" 4435 return self.do_reset(False) 4436 4437 def do_reset(self, power_on): 4438 """Resets the default adapter.""" 4439 # Start manager and enable Floss if not already up 4440 if not self.configure_floss(enabled=True): 4441 return False 4442 4443 default_adapter = self.manager_client.get_default_adapter() 4444 4445 def _is_adapter_down(client): 4446 return lambda: not client.has_proxy() 4447 4448 def _is_adapter_ready(client): 4449 return lambda: client.has_proxy() and client.get_address() 4450 4451 self.manager_client.stop(default_adapter) 4452 try: 4453 condition = _is_adapter_down(self.adapter_client) 4454 utils.poll_for_condition(condition=condition, 4455 desc='Wait for adapter stop', 4456 sleep_interval=0.5, 4457 timeout=self.ADAPTER_DAEMON_TIMEOUT_SEC) 4458 except Exception as e: 4459 logging.error('timeout: error stopping adapter daemon: %s', e) 4460 logging.error(traceback.format_exc()) 4461 return False 4462 4463 if not power_on: 4464 logging.debug('do_reset: Completed with power_on=False') 4465 return True 4466 4467 # Start the client again 4468 self.manager_client.start(default_adapter) 4469 self.adapter_client = FlossAdapterClient(self.bus, default_adapter) 4470 4471 try: 4472 condition = _is_adapter_ready(self.adapter_client) 4473 utils.poll_for_condition(condition=condition, 4474 desc='Wait for adapter start', 4475 sleep_interval=0.5, 4476 timeout=self.ADAPTER_DAEMON_TIMEOUT_SEC) 4477 except Exception as e: 4478 logging.error('timeout: error starting adapter daemon: %s', e) 4479 logging.error(traceback.format_exc()) 4480 return False 4481 4482 # We need to observe callbacks for proper operation. 4483 if not self.adapter_client.register_callbacks(): 4484 logging.error('adapter_client: Failed to register callbacks') 4485 return False 4486 4487 logging.debug('do_reset: Completed with power_on=True') 4488 return True 4489 4490 def policy_get_service_allow_list(self): 4491 """Gets the service allow list for enterprise policy.""" 4492 # TODO(abps) - Actually implement this 4493 return [] 4494 4495 def policy_set_service_allow_list(self, uuids): 4496 """Sets the service allow list for enterprise policy.""" 4497 # TODO(abps) - Actually implement this 4498 return (True, '') 4499 4500 def get_address(self): 4501 """Gets the default adapter address.""" 4502 return self.adapter_client.get_address() 4503 4504 def has_device(self, address): 4505 """Checks if adapter knows the device.""" 4506 return self.adapter_client.has_device(address) 4507 4508 def remove_device_object(self, address): 4509 """Removes a known device object.""" 4510 return self.adapter_client.forget_device(address) 4511 4512 def connect_device(self, address): 4513 """Connect a specific address.""" 4514 return self.adapter_client.connect_all_enabled_profiles(address) 4515 4516 def disconnect_device(self, address): 4517 """Disconnect a specific address.""" 4518 return self.adapter_client.disconnect_all_enabled_profiles(address) 4519 4520 def get_device_property(self, address, prop_name): 4521 """Read a property from a remote device. 4522 4523 @param address: Address of the device to query 4524 @param prop_name: Property to be queried 4525 4526 @return Base64 encoded json if property exists or None. 4527 """ 4528 prop_val = None 4529 4530 if self.adapter_client.has_device(address): 4531 prop_val = self.adapter_client.get_remote_property( 4532 address, prop_name) 4533 4534 return self._encode_base64_json(prop_val) 4535 4536 def get_pairable(self): 4537 """Gets whether the default adapter is pairable. 4538 4539 @return True if default adapter is pairable. 4540 """ 4541 # TODO(abps) - Control pairable setting on adapter 4542 return self.mock_properties.get('Pairable', False) 4543 4544 def set_pairable(self, pairable): 4545 """Sets default adapter as pairable. 4546 4547 @param pairable: Control pairable property of the adapter. 4548 4549 @return True on success. 4550 """ 4551 # TODO(abps) - Control pairable setting on adapter 4552 self.mock_properties['Pairable'] = pairable 4553 return True 4554 4555 def pair_legacy_device(self, address, pin, trusted, timeout=60): 4556 """Pairs a peer device. 4557 4558 @param address: BT address of the peer device. 4559 @param pin: What pin to use for pairing. 4560 @param trusted: Unused by Floss. 4561 @param timeout: How long to wait for pairing to complete. 4562 """ 4563 4564 class PairingObserver(BluetoothCallbacks, 4565 BluetoothConnectionCallbacks): 4566 """Observer of certain callbacks for pairing.""" 4567 4568 def __init__(self, adapter_client, done_event, address, pin): 4569 self.adapter_client = adapter_client 4570 self.adapter_client.register_callback_observer( 4571 'PairingObserver' + address, self) 4572 4573 # Event to trigger once we are paired and connected. 4574 self.done_event = done_event 4575 self.address = address 4576 self.pin = pin 4577 self.bond_state = BondState.NOT_BONDED 4578 self.connected = self.adapter_client.is_connected(address) 4579 4580 def __del__(self): 4581 """Destructor""" 4582 if self.adapter_client: 4583 self.cleanup() 4584 4585 def cleanup(self): 4586 """Clean up after this observer.""" 4587 self.adapter_client.unregister_callback_observer( 4588 'PairingObserver' + address, self) 4589 self.adapter_client = None 4590 4591 def on_bond_state_changed(self, status, device_address, state): 4592 """Handle bond state change.""" 4593 logging.info('[%s] bond state=%d', device_address, state) 4594 4595 if device_address != self.address: 4596 return 4597 4598 # If we have a non-zero status, bonding failed in some way. 4599 # Report it and unblock the main thread. 4600 if status != 0: 4601 logging.error('[%s] failed to bond. Status=%d, State=%d', 4602 device_address, status, state) 4603 self.done_event.set() 4604 return 4605 4606 self.bond_state = state 4607 logging.info('[%s] bond state=%d', device_address, state) 4608 4609 # We've completed bonding. Make sure to connect 4610 if state == BondState.BONDED: 4611 # If not connected, connect profiles and wait for connected 4612 # callback. Else, unblock the main thread. 4613 if not self.connected: 4614 if not self.adapter_client.connect_all_enabled_profiles( 4615 self.address): 4616 logging.error( 4617 '[%s] failed on connect_all_enabled_profiles', 4618 self.address) 4619 self.done_event.set() 4620 else: 4621 self.done_event.set() 4622 4623 def on_ssp_request(self, remote_device, class_of_device, variant, 4624 passkey): 4625 """Handle SSP request.""" 4626 (remote_address, remote_name) = remote_device 4627 4628 if remote_address != self.address: 4629 return 4630 4631 logging.info('Ssp: [%s: %s]: Class=%d, Variant=%d, Passkey=%d', 4632 remote_address, remote_name, class_of_device, 4633 variant, passkey) 4634 4635 if variant == int(SspVariant.CONSENT): 4636 self.adapter_client.set_pairing_confirmation( 4637 remote_address, 4638 True, 4639 method_callback=self.on_set_pairing_confirmation) 4640 4641 logging.info('Exited ssp request.') 4642 4643 def on_set_pairing_confirmation(self, err, result): 4644 """Handle async method result from set pairing confirmation.""" 4645 if err or not result: 4646 logging.error( 4647 'Pairing confirmation failed: err[%s], result[%s]', 4648 err, result) 4649 self.done_event.set() 4650 4651 def on_device_connected(self, remote_device): 4652 """Handle device connection.""" 4653 (remote_address, _) = remote_device 4654 4655 logging.info('[%s] connected', remote_address) 4656 4657 if remote_address != self.address: 4658 return 4659 4660 self.connected = True 4661 4662 # If we're already bonded, unblock the main thread. 4663 if self.bond_state == BondState.BONDED: 4664 self.done_event.set() 4665 4666 # Start pairing process in main thread 4667 4668 done_evt = threading.Event() 4669 4670 # First we need an observer that watches for callbacks 4671 pairing_observer = PairingObserver(self.adapter_client, done_evt, 4672 address, pin) 4673 4674 # Pair and connect. If either action fails, mark the done event so that 4675 # we fall through without blocking. 4676 if not self.device_is_paired(address): 4677 if not self.adapter_client.create_bond(address, Transport.AUTO): 4678 done_evt.set() 4679 elif not self.device_is_connected(address): 4680 if not self.adapter_client.connect_all_enabled_profiles(address): 4681 done_evt.set() 4682 4683 done_evt.wait(timeout=timeout) 4684 if not done_evt.is_set(): 4685 logging.error('Timed out waiting for pairing to complete.') 4686 4687 is_paired = self.device_is_paired(address) 4688 is_connected = self.device_is_connected(address) 4689 4690 # If pairing and hci connection is complete, also trigger all profile 4691 # connections here. This is necessary because device connection doesn't 4692 # always imply profile connection. 4693 if is_paired and is_connected: 4694 self.adapter_client.connect_all_enabled_profiles(address) 4695 4696 logging.info('Pairing result: paired(%s) connected(%s)', is_paired, 4697 is_connected) 4698 4699 return is_paired and is_connected 4700 4701 def device_is_connected(self, address): 4702 """Checks whether a device is connected. 4703 4704 @param address: BT address of peer device. 4705 @return True if connected. 4706 """ 4707 return self.adapter_client.is_connected(address) 4708 4709 def has_connection_info(self, address): 4710 """Same as |device_is_connected| on Floss. 4711 4712 Bluez has a separate ConnectionInfo tuple that is read from the kernel 4713 but Floss doesn't have this. We have this function simply for 4714 compatibility. 4715 4716 @param address: BT address of peer device. 4717 @return True if connected. 4718 """ 4719 return self.device_is_connected(address) 4720 4721 def get_num_connected_devices(self): 4722 """ Return number of remote devices currently connected to the DUT. 4723 4724 @returns: The number of devices known to bluez with the Connected 4725 property active 4726 """ 4727 return self.adapter_client.get_connected_devices_count() 4728 4729 def device_is_paired(self, address): 4730 """Checks if a device is paired. 4731 4732 @param address: address of the device. 4733 @returns: True if device is paired. False otherwise. 4734 """ 4735 return self.adapter_client.is_bonded(address) 4736 4737 def is_discoverable(self): 4738 """Return whether the adapter is currently discoverable.""" 4739 return self.adapter_client.get_property('Discoverable') 4740 4741 def set_discoverable(self, discoverable, duration=60): 4742 """Sets the adapter as discoverable for given duration in seconds.""" 4743 return self.adapter_client.set_property('Discoverable', discoverable, 4744 duration) 4745 4746 def get_supported_capabilities(self): 4747 """" Get supported capabilities of the adapter.""" 4748 return (json.dumps({}), 'Not yet implemented') 4749