1"""Controller class for an android bt device with git_master-bds-dev build. 2 3The config for this derived_bt_target_device in mobileharness is: 4- name: android_bt_target_device 5 devices: 6 - type: MiscTestbedSubDevice 7 dimensions: 8 mobly_type: DerivedBtDevice 9 properties: 10 ModuleName: android_bt_target_device 11 ClassName: AndroidBtTargetDevice 12 Params: 13 config: 14 device_id: phone_serial_number 15 audio_params: 16 channel: 2 17 duration: 50 18 music_file: "music.wav" 19 sample_rate: 44100 20""" 21 22import logging 23import os 24import time 25from typing import Any, Dict, Optional 26 27from mobly import asserts 28from mobly import signals 29from mobly.controllers import android_device 30 31# Internal import 32from blueberry.utils import android_bluetooth_decorator 33from blueberry.utils import bt_constants 34from blueberry.utils import bt_test_utils as btutils 35 36_CONNECTION_STATE = bt_constants.BluetoothConnectionStatus 37 38ADB_FILE = 'rec.pcm' 39ADB_PATH = '/sdcard/Music/' 40WAVE_FILE_TEMPLATE = 'recorded_audio_%s.wav' 41DEFAULT_WAIT_TIME = 3.0 42 43# A MediaBrowserService implemented in the SL4A app to intercept Media keys and 44# commands. 45BLUETOOTH_SL4A_AUDIO_SRC_MBS = 'BluetoothSL4AAudioSrcMBS' 46 47A2DP_HFP_PROFILES = [ 48 bt_constants.BluetoothProfile.A2DP_SINK, 49 bt_constants.BluetoothProfile.HEADSET_CLIENT 50] 51 52 53class AndroidBtTargetDevice(object): 54 """Implements an android device as a hfp and a2dp sink device. 55 56 With git_master-bds-dev build, the android device can act as a bluetooth 57 hfp and a2dp sink device. 58 """ 59 60 def __init__(self, config: Dict[str, Any]) -> None: 61 """Initializes an android hfp device.""" 62 logging.info('Initializes the android hfp device') 63 self.config = config 64 self.pri_ad = None 65 self.sec_ad = None 66 self.serial = config.get('device_id', None) 67 self.audio_params = config.get('audio_params', None) 68 69 if self.serial: 70 # self._ad for accessing the device at the end of the test 71 self._ad = android_device.AndroidDevice(self.serial) 72 self.aud = adb_ui_device.AdbUiDevice(self._ad) 73 self.pri_ad = android_bluetooth_decorator.AndroidBluetoothDecorator( 74 self._ad) 75 self.pri_ad.init_setup() 76 self.pri_ad.sl4a_setup() 77 self.sl4a = self._ad.services.sl4a 78 self.mac_address = self.sl4a.bluetoothGetLocalAddress() 79 80 if self.audio_params: 81 self._initialize_audio_params() 82 self.avrcp_ready = False 83 84 def __getattr__(self, name: str) -> Any: 85 return getattr(self.pri_ad, name) 86 87 def _disable_profiles(self) -> None: 88 if self.sec_ad is None: 89 raise MissingBtClientDeviceError('Please provide sec_ad forsetting' 90 'profiles') 91 self.set_profiles_policy_off(self.sec_ad, A2DP_HFP_PROFILES) 92 93 def _initialize_audio_params(self) -> None: 94 self.audio_capture_path = os.path.join(self._ad.log_path, 'audio_capture') 95 os.makedirs(self.audio_capture_path) 96 self.adb_path = os.path.join(ADB_PATH, ADB_FILE) 97 self.wave_file_template = os.path.join(self.audio_capture_path, 98 WAVE_FILE_TEMPLATE) 99 self.wave_file_number = 0 100 101 def _verify_pri_ad(self) -> None: 102 if not self.pri_ad: 103 raise signals.ControllerError('No be target device') 104 105 def clean_up(self) -> None: 106 """Resets Bluetooth and stops all services when the device is destroyed.""" 107 self.deactivate_ble_pairing_mode() 108 self.factory_reset_bluetooth() 109 self._ad.services.stop_all() 110 111 def a2dp_sink_connect(self) -> bool: 112 """Establishes the hft connection between self.pri_ad and self.sec_ad.""" 113 self._verify_pri_ad() 114 connected = self.pri_ad.a2dp_sink_connect(self.sec_ad) 115 asserts.assert_true( 116 connected, 'The a2dp sink connection between {} and {} failed'.format( 117 self.serial, self.sec_ad.serial)) 118 self.log.info('The a2dp sink connection between %s and %s succeeded', 119 self.serial, self.sec_ad.serial) 120 return True 121 122 def activate_pairing_mode(self) -> None: 123 """Makes the android hfp device discoverable over Bluetooth.""" 124 self.log.info('Activating the pairing mode of the android target device') 125 self.pri_ad.activate_pairing_mode() 126 127 def activate_ble_pairing_mode(self) -> None: 128 """Activates BLE pairing mode on an AndroidBtTargetDevice.""" 129 self.pri_ad.activate_ble_pairing_mode() 130 131 def deactivate_ble_pairing_mode(self) -> None: 132 """Deactivates BLE pairing mode on an AndroidBtTargetDevice.""" 133 self.pri_ad.deactivate_ble_pairing_mode() 134 135 def add_pri_ad_device(self, pri_ad: android_device.AndroidDevice) -> None: 136 """Adds primary android device as bt target device. 137 138 The primary android device should have been initialized with 139 android_bluetooth_decorator. 140 141 Args: 142 pri_ad: the primary android device as bt target device. 143 """ 144 self._ad = pri_ad 145 self.pri_ad = pri_ad 146 self.sl4a = self._ad.services.sl4a 147 self.mac_address = self.sl4a.bluetoothGetLocalAddress() 148 self.log = self.pri_ad.log 149 self.serial = self.pri_ad.serial 150 self.log.info( 151 'Adds primary android device with id %s for the bluetooth' 152 'connection', pri_ad.serial) 153 if self.audio_params: 154 self._initialize_audio_params() 155 156 def add_sec_ad_device(self, sec_ad: android_device.AndroidDevice) -> None: 157 """Adds second android device for bluetooth connection. 158 159 The second android device should have sl4a service acitvated. 160 161 Args: 162 sec_ad: the second android device for bluetooth connection. 163 """ 164 self.log.info( 165 'Adds second android device with id %s for the bluetooth' 166 'connection', sec_ad.serial) 167 self.sec_ad = sec_ad 168 self.sec_ad_mac_address = self.sec_ad.sl4a.bluetoothGetLocalAddress() 169 170 def answer_phone_call(self) -> bool: 171 """Answers an incoming phone call.""" 172 if not self.is_hfp_connected(): 173 self.hfp_connect() 174 # Make sure the device is in ringing state. 175 if not self.wait_for_call_state( 176 bt_constants.CALL_STATE_RINGING, bt_constants.CALL_STATE_TIMEOUT_SEC): 177 raise signals.ControllerError( 178 'Timed out after %ds waiting for the device %s to be ringing state ' 179 'before anwsering the incoming phone call.' % 180 (bt_constants.CALL_STATE_TIMEOUT_SEC, self.serial)) 181 self.log.info('Answers the incoming phone call from hf phone %s for %s', 182 self.mac_address, self.sec_ad_mac_address) 183 return self.sl4a.bluetoothHfpClientAcceptCall(self.sec_ad_mac_address) 184 185 def call_volume_down(self) -> None: 186 """Lowers the volume.""" 187 current_volume = self.mbs.getVoiceCallVolume() 188 if current_volume > 0: 189 change_volume = current_volume - 1 190 self.log.debug('Set voice call volume from %d to %d.' % 191 (current_volume, change_volume)) 192 self.mbs.setVoiceCallVolume(change_volume) 193 194 def call_volume_up(self) -> None: 195 """Raises the volume.""" 196 current_volume = self.mbs.getVoiceCallVolume() 197 if current_volume < self.mbs.getVoiceCallMaxVolume(): 198 change_volume = current_volume + 1 199 self.log.debug('Set voice call volume from %d to %d.' % 200 (current_volume, change_volume)) 201 self.mbs.setVoiceCallVolume(change_volume) 202 203 def disconnect_all(self) -> None: 204 self._disable_profiles() 205 206 def factory_reset_bluetooth(self) -> None: 207 """Factory resets Bluetooth on the android hfp device.""" 208 self.log.info('Factory resets Bluetooth on the android target device') 209 self.pri_ad.factory_reset_bluetooth() 210 211 def get_bluetooth_mac_address(self) -> str: 212 """Gets Bluetooth mac address of this android_bt_device.""" 213 self.log.info('Getting Bluetooth mac address for AndroidBtTargetDevice.') 214 mac_address = self.sl4a.bluetoothGetLocalAddress() 215 self.log.info('Bluetooth mac address of AndroidBtTargetDevice: %s', 216 mac_address) 217 return mac_address 218 219 def get_audio_params(self) -> Optional[Dict[str, str]]: 220 """Gets audio params from the android_bt_target_device.""" 221 return self.audio_params 222 223 def get_new_wave_file_path(self) -> str: 224 """Gets a new wave file path for the audio capture.""" 225 wave_file_path = self.wave_file_template % self.wave_file_number 226 while os.path.exists(wave_file_path): 227 self.wave_file_number += 1 228 wave_file_path = self.wave_file_template % self.wave_file_number 229 return wave_file_path 230 231 def get_unread_messages(self) -> None: 232 """Gets unread messages from the connected device (MSE).""" 233 self.sl4a.mapGetUnreadMessages(self.sec_ad_mac_address) 234 235 def hangup_phone_call(self) -> bool: 236 """Hangs up an ongoing phone call.""" 237 if not self.is_hfp_connected(): 238 self.hfp_connect() 239 self.log.info('Hangs up the phone call from hf phone %s for %s', 240 self.mac_address, self.sec_ad_mac_address) 241 return self.sl4a.bluetoothHfpClientTerminateAllCalls( 242 self.sec_ad_mac_address) 243 244 def hfp_connect(self) -> bool: 245 """Establishes the hft connection between self.pri_ad and self.sec_ad.""" 246 self._verify_pri_ad() 247 connected = self.pri_ad.hfp_connect(self.sec_ad) 248 asserts.assert_true( 249 connected, 'The hfp connection between {} and {} failed'.format( 250 self.serial, self.sec_ad.serial)) 251 self.log.info('The hfp connection between %s and %s succeed', self.serial, 252 self.sec_ad.serial) 253 return connected 254 255 def init_ambs_for_avrcp(self) -> bool: 256 """Initializes media browser service for avrcp. 257 258 This is required to be done before running any of the passthrough 259 commands. 260 261 Steps: 262 1. Starts up the AvrcpMediaBrowserService on the A2dp source phone. This 263 MediaBrowserService is part of the SL4A app. 264 2. Switch the playback state to be paused. 265 3. Connects a MediaBrowser to the A2dp sink's A2dpMediaBrowserService. 266 267 Returns: 268 True: if it is avrcp ready after the initialization. 269 False: if it is still not avrcp ready after the initialization. 270 271 Raises: 272 Signals.ControllerError: raise if AvrcpMediaBrowserService on the A2dp 273 source phone fails to be started. 274 """ 275 if self.is_avrcp_ready(): 276 return True 277 if not self.is_a2dp_sink_connected(): 278 self.a2dp_sink_connect() 279 280 self.sec_ad.log.info('Starting AvrcpMediaBrowserService') 281 self.sec_ad.sl4a.bluetoothMediaPhoneSL4AMBSStart() 282 283 time.sleep(DEFAULT_WAIT_TIME) 284 285 # Check if the media session "BluetoothSL4AAudioSrcMBS" is active on sec_ad. 286 active_sessions = self.sec_ad.sl4a.bluetoothMediaGetActiveMediaSessions() 287 if BLUETOOTH_SL4A_AUDIO_SRC_MBS not in active_sessions: 288 raise signals.ControllerError('Failed to start AvrcpMediaBrowserService.') 289 290 self.log.info('Connecting to A2dp media browser service') 291 self.sl4a.bluetoothMediaConnectToCarMBS() 292 293 # TODO(user) Wait for an event back instead of sleep 294 time.sleep(DEFAULT_WAIT_TIME) 295 self.avrcp_ready = True 296 return self.avrcp_ready 297 298 def is_avrcp_ready(self) -> bool: 299 """Checks if the pri_ad and sec_ad are ready for avrcp.""" 300 self._verify_pri_ad() 301 if self.avrcp_ready: 302 return True 303 active_sessions = self.sl4a.bluetoothMediaGetActiveMediaSessions() 304 if not active_sessions: 305 self.log.info('The device is not avrcp ready') 306 self.avrcp_ready = False 307 else: 308 self.log.info('The device is avrcp ready') 309 self.avrcp_ready = True 310 return self.avrcp_ready 311 312 def is_hfp_connected(self) -> _CONNECTION_STATE: 313 """Checks if the pri_ad and sec_ad are hfp connected.""" 314 self._verify_pri_ad() 315 if self.sec_ad is None: 316 raise MissingBtClientDeviceError('The sec_ad was not added') 317 return self.sl4a.bluetoothHfpClientGetConnectionStatus( 318 self.sec_ad_mac_address) 319 320 def is_a2dp_sink_connected(self) -> _CONNECTION_STATE: 321 """Checks if the pri_ad and sec_ad are hfp connected.""" 322 self._verify_pri_ad() 323 if self.sec_ad is None: 324 raise MissingBtClientDeviceError('The sec_ad was not added') 325 return self.sl4a.bluetoothA2dpSinkGetConnectionStatus( 326 self.sec_ad_mac_address) 327 328 def last_number_dial(self) -> None: 329 """Redials last outgoing phone number.""" 330 if not self.is_hfp_connected(): 331 self.hfp_connect() 332 self.log.info('Redials last number from hf phone %s for %s', 333 self.mac_address, self.sec_ad_mac_address) 334 self.sl4a.bluetoothHfpClientDial(self.sec_ad_mac_address, None) 335 336 def map_connect(self) -> None: 337 """Establishes the map connection between self.pri_ad and self.sec_ad.""" 338 self._verify_pri_ad() 339 connected = self.pri_ad.map_connect(self.sec_ad) 340 asserts.assert_true( 341 connected, 'The map connection between {} and {} failed'.format( 342 self.serial, self.sec_ad.serial)) 343 self.log.info('The map connection between %s and %s succeed', self.serial, 344 self.sec_ad.serial) 345 346 def map_disconnect(self) -> None: 347 """Initiates a map disconnection to the connected device. 348 349 Raises: 350 BluetoothProfileConnectionError: raised if failed to disconnect. 351 """ 352 self._verify_pri_ad() 353 if not self.pri_ad.map_disconnect(self.sec_ad_mac_address): 354 raise BluetoothProfileConnectionError( 355 'Failed to terminate the MAP connection with the device "%s".' % 356 self.sec_ad_mac_address) 357 358 def pbap_connect(self) -> None: 359 """Establishes the pbap connection between self.pri_ad and self.sec_ad.""" 360 connected = self.pri_ad.pbap_connect(self.sec_ad) 361 asserts.assert_true( 362 connected, 'The pbap connection between {} and {} failed'.format( 363 self.serial, self.sec_ad.serial)) 364 self.log.info('The pbap connection between %s and %s succeed', self.serial, 365 self.sec_ad.serial) 366 367 def pause(self) -> None: 368 """Sends Avrcp pause command.""" 369 self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PAUSE, self.sec_ad) 370 371 def play(self) -> None: 372 """Sends Avrcp play command.""" 373 self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PLAY, self.sec_ad) 374 375 def power_on(self) -> bool: 376 """Turns the Bluetooth on the android bt garget device.""" 377 self.log.info('Turns on the bluetooth') 378 return self.sl4a.bluetoothToggleState(True) 379 380 def power_off(self) -> bool: 381 """Turns the Bluetooth off the android bt garget device.""" 382 self.log.info('Turns off the bluetooth') 383 return self.sl4a.bluetoothToggleState(False) 384 385 def route_call_audio(self, connect: bool = False) -> None: 386 """Routes call audio during a call.""" 387 if not self.is_hfp_connected(): 388 self.hfp_connect() 389 self.log.info( 390 'Routes call audio during a call from hf phone %s for %s ' 391 'audio connection %s after routing', self.mac_address, 392 self.sec_ad_mac_address, connect) 393 if connect: 394 self.sl4a.bluetoothHfpClientConnectAudio(self.sec_ad_mac_address) 395 else: 396 self.sl4a.bluetoothHfpClientDisconnectAudio(self.sec_ad_mac_address) 397 398 def reject_phone_call(self) -> bool: 399 """Rejects an incoming phone call.""" 400 if not self.is_hfp_connected(): 401 self.hfp_connect() 402 # Make sure the device is in ringing state. 403 if not self.wait_for_call_state( 404 bt_constants.CALL_STATE_RINGING, bt_constants.CALL_STATE_TIMEOUT_SEC): 405 raise signals.ControllerError( 406 'Timed out after %ds waiting for the device %s to be ringing state ' 407 'before rejecting the incoming phone call.' % 408 (bt_constants.CALL_STATE_TIMEOUT_SEC, self.serial)) 409 self.log.info('Rejects the incoming phone call from hf phone %s for %s', 410 self.mac_address, self.sec_ad_mac_address) 411 return self.sl4a.bluetoothHfpClientRejectCall(self.sec_ad_mac_address) 412 413 def set_audio_params(self, audio_params: Optional[Dict[str, str]]) -> None: 414 """Sets audio params to the android_bt_target_device.""" 415 self.audio_params = audio_params 416 417 def track_previous(self) -> None: 418 """Sends Avrcp skip prev command.""" 419 self.send_media_passthrough_cmd( 420 bt_constants.CMD_MEDIA_SKIP_PREV, self.sec_ad) 421 422 def track_next(self) -> None: 423 """Sends Avrcp skip next command.""" 424 self.send_media_passthrough_cmd( 425 bt_constants.CMD_MEDIA_SKIP_NEXT, self.sec_ad) 426 427 def start_audio_capture(self, duration_sec: int = 20) -> None: 428 """Starts the audio capture over adb. 429 430 Args: 431 duration_sec: int, Number of seconds to record audio, 20 secs as default. 432 """ 433 if 'duration' in self.audio_params.keys(): 434 duration_sec = self.audio_params['duration'] 435 if not self.is_a2dp_sink_connected(): 436 self.a2dp_sink_connect() 437 cmd = 'ap2f --usage 1 --start --duration {} --target {}'.format( 438 duration_sec, self.adb_path) 439 self.log.info('Starts capturing audio with adb shell command %s', cmd) 440 self.adb.shell(cmd) 441 442 def stop_audio_capture(self) -> str: 443 """Stops the audio capture and stores it in wave file. 444 445 Returns: 446 File name of the recorded file. 447 448 Raises: 449 MissingAudioParamsError: when self.audio_params is None 450 """ 451 if self.audio_params is None: 452 raise MissingAudioParamsError('Missing audio params for capturing audio') 453 if not self.is_a2dp_sink_connected(): 454 self.a2dp_sink_connect() 455 adb_pull_args = [self.adb_path, self.audio_capture_path] 456 self.log.info('start adb -s %s pull %s', self.serial, adb_pull_args) 457 self._ad.adb.pull(adb_pull_args) 458 pcm_file_path = os.path.join(self.audio_capture_path, ADB_FILE) 459 self.log.info('delete the recored file %s', self.adb_path) 460 self._ad.adb.shell('rm {}'.format(self.adb_path)) 461 wave_file_path = self.get_new_wave_file_path() 462 self.log.info('convert pcm file %s to wav file %s', pcm_file_path, 463 wave_file_path) 464 btutils.convert_pcm_to_wav(pcm_file_path, wave_file_path, self.audio_params) 465 return wave_file_path 466 467 def stop_all_services(self) -> None: 468 """Stops all services for the pri_ad device.""" 469 self.log.info('Stops all services on the android bt target device') 470 self._ad.services.stop_all() 471 472 def stop_ambs_for_avrcp(self) -> None: 473 """Stops media browser service for avrcp.""" 474 if self.is_avrcp_ready(): 475 self.log.info('Stops avrcp connection') 476 self.sec_ad.sl4a.bluetoothMediaPhoneSL4AMBSStop() 477 self.avrcp_ready = False 478 479 def stop_voice_dial(self) -> None: 480 """Stops voice dial.""" 481 if not self.is_hfp_connected(): 482 self.hfp_connect() 483 self.log.info('Stops voice dial from hf phone %s for %s', self.mac_address, 484 self.sec_ad_mac_address) 485 if self.is_hfp_connected(): 486 self.sl4a.bluetoothHfpClientStopVoiceRecognition( 487 self.sec_ad_mac_address) 488 489 def take_bug_report(self, 490 test_name: Optional[str] = None, 491 begin_time: Optional[int] = None, 492 timeout: float = 300, 493 destination: Optional[str] = None) -> None: 494 """Wrapper method to capture bugreport on the android bt target device.""" 495 self._ad.take_bug_report(test_name, begin_time, timeout, destination) 496 497 def voice_dial(self) -> None: 498 """Triggers voice dial.""" 499 if not self.is_hfp_connected(): 500 self.hfp_connect() 501 self.log.info('Triggers voice dial from hf phone %s for %s', 502 self.mac_address, self.sec_ad_mac_address) 503 if self.is_hfp_connected(): 504 self.sl4a.bluetoothHfpClientStartVoiceRecognition( 505 self.sec_ad_mac_address) 506 507 def log_type(self) -> str: 508 """Gets the log type of Android bt target device. 509 510 Returns: 511 A string, the log type of Android bt target device. 512 """ 513 return bt_constants.LogType.BLUETOOTH_DEVICE_SIMULATOR.value 514 515 516class BluetoothProfileConnectionError(Exception): 517 """Error for Bluetooth Profile connection problems.""" 518 519 520class MissingBtClientDeviceError(Exception): 521 """Error for missing required bluetooth client device.""" 522 523 524class MissingAudioParamsError(Exception): 525 """Error for missing the audio params.""" 526