• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Controller class for an android bt device with git_master-bds-dev build.
2
3The config for this derived_bt_target_device in mobileharness is:
4- name: android_bt_target_device
5  devices:
6  - type: MiscTestbedSubDevice
7    dimensions:
8      mobly_type: DerivedBtDevice
9    properties:
10      ModuleName: android_bt_target_device
11      ClassName: AndroidBtTargetDevice
12      Params:
13        config:
14          device_id: phone_serial_number
15          audio_params:
16            channel: 2
17            duration: 50
18            music_file: "music.wav"
19            sample_rate: 44100
20"""
21
22import logging
23import os
24import time
25from typing import Any, Dict, Optional
26
27from mobly import asserts
28from mobly import signals
29from mobly.controllers import android_device
30
31# Internal import
32from blueberry.utils import android_bluetooth_decorator
33from blueberry.utils import bt_constants
34from blueberry.utils import bt_test_utils as btutils
35
36_CONNECTION_STATE = bt_constants.BluetoothConnectionStatus
37
38ADB_FILE = 'rec.pcm'
39ADB_PATH = '/sdcard/Music/'
40WAVE_FILE_TEMPLATE = 'recorded_audio_%s.wav'
41DEFAULT_WAIT_TIME = 3.0
42
43# A MediaBrowserService implemented in the SL4A app to intercept Media keys and
44# commands.
45BLUETOOTH_SL4A_AUDIO_SRC_MBS = 'BluetoothSL4AAudioSrcMBS'
46
47A2DP_HFP_PROFILES = [
48    bt_constants.BluetoothProfile.A2DP_SINK,
49    bt_constants.BluetoothProfile.HEADSET_CLIENT
50]
51
52
53class AndroidBtTargetDevice(object):
54  """Implements an android device as a hfp and a2dp sink device.
55
56  With git_master-bds-dev build, the android device can act as a bluetooth
57  hfp and a2dp sink device.
58  """
59
60  def __init__(self, config: Dict[str, Any]) -> None:
61    """Initializes an android hfp device."""
62    logging.info('Initializes the android hfp device')
63    self.config = config
64    self.pri_ad = None
65    self.sec_ad = None
66    self.serial = config.get('device_id', None)
67    self.audio_params = config.get('audio_params', None)
68
69    if self.serial:
70      # self._ad for accessing the device at the end of the test
71      self._ad = android_device.AndroidDevice(self.serial)
72      self.aud = adb_ui_device.AdbUiDevice(self._ad)
73      self.pri_ad = android_bluetooth_decorator.AndroidBluetoothDecorator(
74          self._ad)
75      self.pri_ad.init_setup()
76      self.pri_ad.sl4a_setup()
77      self.sl4a = self._ad.services.sl4a
78      self.mac_address = self.sl4a.bluetoothGetLocalAddress()
79
80      if self.audio_params:
81        self._initialize_audio_params()
82    self.avrcp_ready = False
83
84  def __getattr__(self, name: str) -> Any:
85    return getattr(self.pri_ad, name)
86
87  def _disable_profiles(self) -> None:
88    if self.sec_ad is None:
89      raise MissingBtClientDeviceError('Please provide sec_ad forsetting'
90                                       'profiles')
91    self.set_profiles_policy_off(self.sec_ad, A2DP_HFP_PROFILES)
92
93  def _initialize_audio_params(self) -> None:
94    self.audio_capture_path = os.path.join(self._ad.log_path, 'audio_capture')
95    os.makedirs(self.audio_capture_path)
96    self.adb_path = os.path.join(ADB_PATH, ADB_FILE)
97    self.wave_file_template = os.path.join(self.audio_capture_path,
98                                           WAVE_FILE_TEMPLATE)
99    self.wave_file_number = 0
100
101  def _verify_pri_ad(self) -> None:
102    if not self.pri_ad:
103      raise signals.ControllerError('No be target device')
104
105  def clean_up(self) -> None:
106    """Resets Bluetooth and stops all services when the device is destroyed."""
107    self.deactivate_ble_pairing_mode()
108    self.factory_reset_bluetooth()
109    self._ad.services.stop_all()
110
111  def a2dp_sink_connect(self) -> bool:
112    """Establishes the hft connection between self.pri_ad and self.sec_ad."""
113    self._verify_pri_ad()
114    connected = self.pri_ad.a2dp_sink_connect(self.sec_ad)
115    asserts.assert_true(
116        connected, 'The a2dp sink connection between {} and {} failed'.format(
117            self.serial, self.sec_ad.serial))
118    self.log.info('The a2dp sink connection between %s and %s succeeded',
119                  self.serial, self.sec_ad.serial)
120    return True
121
122  def activate_pairing_mode(self) -> None:
123    """Makes the android hfp device discoverable over Bluetooth."""
124    self.log.info('Activating the pairing mode of the android target device')
125    self.pri_ad.activate_pairing_mode()
126
127  def activate_ble_pairing_mode(self) -> None:
128    """Activates BLE pairing mode on an AndroidBtTargetDevice."""
129    self.pri_ad.activate_ble_pairing_mode()
130
131  def deactivate_ble_pairing_mode(self) -> None:
132    """Deactivates BLE pairing mode on an AndroidBtTargetDevice."""
133    self.pri_ad.deactivate_ble_pairing_mode()
134
135  def add_pri_ad_device(self, pri_ad: android_device.AndroidDevice) -> None:
136    """Adds primary android device as bt target device.
137
138    The primary android device should have been initialized with
139    android_bluetooth_decorator.
140
141    Args:
142      pri_ad: the primary android device as bt target device.
143    """
144    self._ad = pri_ad
145    self.pri_ad = pri_ad
146    self.sl4a = self._ad.services.sl4a
147    self.mac_address = self.sl4a.bluetoothGetLocalAddress()
148    self.log = self.pri_ad.log
149    self.serial = self.pri_ad.serial
150    self.log.info(
151        'Adds primary android device with id %s for the bluetooth'
152        'connection', pri_ad.serial)
153    if self.audio_params:
154      self._initialize_audio_params()
155
156  def add_sec_ad_device(self, sec_ad: android_device.AndroidDevice) -> None:
157    """Adds second android device for bluetooth connection.
158
159    The second android device should have sl4a service acitvated.
160
161    Args:
162      sec_ad: the second android device for bluetooth connection.
163    """
164    self.log.info(
165        'Adds second android device with id %s for the bluetooth'
166        'connection', sec_ad.serial)
167    self.sec_ad = sec_ad
168    self.sec_ad_mac_address = self.sec_ad.sl4a.bluetoothGetLocalAddress()
169
170  def answer_phone_call(self) -> bool:
171    """Answers an incoming phone call."""
172    if not self.is_hfp_connected():
173      self.hfp_connect()
174    # Make sure the device is in ringing state.
175    if not self.wait_for_call_state(
176        bt_constants.CALL_STATE_RINGING, bt_constants.CALL_STATE_TIMEOUT_SEC):
177      raise signals.ControllerError(
178          'Timed out after %ds waiting for the device %s to be ringing state '
179          'before anwsering the incoming phone call.' %
180          (bt_constants.CALL_STATE_TIMEOUT_SEC, self.serial))
181    self.log.info('Answers the incoming phone call from hf phone %s for %s',
182                  self.mac_address, self.sec_ad_mac_address)
183    return self.sl4a.bluetoothHfpClientAcceptCall(self.sec_ad_mac_address)
184
185  def call_volume_down(self) -> None:
186    """Lowers the volume."""
187    current_volume = self.mbs.getVoiceCallVolume()
188    if current_volume > 0:
189      change_volume = current_volume - 1
190      self.log.debug('Set voice call volume from %d to %d.' %
191                     (current_volume, change_volume))
192      self.mbs.setVoiceCallVolume(change_volume)
193
194  def call_volume_up(self) -> None:
195    """Raises the volume."""
196    current_volume = self.mbs.getVoiceCallVolume()
197    if current_volume < self.mbs.getVoiceCallMaxVolume():
198      change_volume = current_volume + 1
199      self.log.debug('Set voice call volume from %d to %d.' %
200                     (current_volume, change_volume))
201      self.mbs.setVoiceCallVolume(change_volume)
202
203  def disconnect_all(self) -> None:
204    self._disable_profiles()
205
206  def factory_reset_bluetooth(self) -> None:
207    """Factory resets Bluetooth on the android hfp device."""
208    self.log.info('Factory resets Bluetooth on the android target device')
209    self.pri_ad.factory_reset_bluetooth()
210
211  def get_bluetooth_mac_address(self) -> str:
212    """Gets Bluetooth mac address of this android_bt_device."""
213    self.log.info('Getting Bluetooth mac address for AndroidBtTargetDevice.')
214    mac_address = self.sl4a.bluetoothGetLocalAddress()
215    self.log.info('Bluetooth mac address of AndroidBtTargetDevice: %s',
216                  mac_address)
217    return mac_address
218
219  def get_audio_params(self) -> Optional[Dict[str, str]]:
220    """Gets audio params from the android_bt_target_device."""
221    return self.audio_params
222
223  def get_new_wave_file_path(self) -> str:
224    """Gets a new wave file path for the audio capture."""
225    wave_file_path = self.wave_file_template % self.wave_file_number
226    while os.path.exists(wave_file_path):
227      self.wave_file_number += 1
228      wave_file_path = self.wave_file_template % self.wave_file_number
229    return wave_file_path
230
231  def get_unread_messages(self) -> None:
232    """Gets unread messages from the connected device (MSE)."""
233    self.sl4a.mapGetUnreadMessages(self.sec_ad_mac_address)
234
235  def hangup_phone_call(self) -> bool:
236    """Hangs up an ongoing phone call."""
237    if not self.is_hfp_connected():
238      self.hfp_connect()
239    self.log.info('Hangs up the phone call from hf phone %s for %s',
240                  self.mac_address, self.sec_ad_mac_address)
241    return self.sl4a.bluetoothHfpClientTerminateAllCalls(
242        self.sec_ad_mac_address)
243
244  def hfp_connect(self) -> bool:
245    """Establishes the hft connection between self.pri_ad and self.sec_ad."""
246    self._verify_pri_ad()
247    connected = self.pri_ad.hfp_connect(self.sec_ad)
248    asserts.assert_true(
249        connected, 'The hfp connection between {} and {} failed'.format(
250            self.serial, self.sec_ad.serial))
251    self.log.info('The hfp connection between %s and %s succeed', self.serial,
252                  self.sec_ad.serial)
253    return connected
254
255  def init_ambs_for_avrcp(self) -> bool:
256    """Initializes media browser service for avrcp.
257
258    This is required to be done before running any of the passthrough
259    commands.
260
261    Steps:
262      1. Starts up the AvrcpMediaBrowserService on the A2dp source phone. This
263           MediaBrowserService is part of the SL4A app.
264      2. Switch the playback state to be paused.
265      3. Connects a MediaBrowser to the A2dp sink's A2dpMediaBrowserService.
266
267    Returns:
268      True: if it is avrcp ready after the initialization.
269      False: if it is still not avrcp ready after the initialization.
270
271    Raises:
272      Signals.ControllerError: raise if AvrcpMediaBrowserService on the A2dp
273          source phone fails to be started.
274    """
275    if self.is_avrcp_ready():
276      return True
277    if not self.is_a2dp_sink_connected():
278      self.a2dp_sink_connect()
279
280    self.sec_ad.log.info('Starting AvrcpMediaBrowserService')
281    self.sec_ad.sl4a.bluetoothMediaPhoneSL4AMBSStart()
282
283    time.sleep(DEFAULT_WAIT_TIME)
284
285    # Check if the media session "BluetoothSL4AAudioSrcMBS" is active on sec_ad.
286    active_sessions = self.sec_ad.sl4a.bluetoothMediaGetActiveMediaSessions()
287    if BLUETOOTH_SL4A_AUDIO_SRC_MBS not in active_sessions:
288      raise signals.ControllerError('Failed to start AvrcpMediaBrowserService.')
289
290    self.log.info('Connecting to A2dp media browser service')
291    self.sl4a.bluetoothMediaConnectToCarMBS()
292
293    # TODO(user) Wait for an event back instead of sleep
294    time.sleep(DEFAULT_WAIT_TIME)
295    self.avrcp_ready = True
296    return self.avrcp_ready
297
298  def is_avrcp_ready(self) -> bool:
299    """Checks if the pri_ad and sec_ad are ready for avrcp."""
300    self._verify_pri_ad()
301    if self.avrcp_ready:
302      return True
303    active_sessions = self.sl4a.bluetoothMediaGetActiveMediaSessions()
304    if not active_sessions:
305      self.log.info('The device is not avrcp ready')
306      self.avrcp_ready = False
307    else:
308      self.log.info('The device is avrcp ready')
309      self.avrcp_ready = True
310    return self.avrcp_ready
311
312  def is_hfp_connected(self) -> _CONNECTION_STATE:
313    """Checks if the pri_ad and sec_ad are hfp connected."""
314    self._verify_pri_ad()
315    if self.sec_ad is None:
316      raise MissingBtClientDeviceError('The sec_ad was not added')
317    return self.sl4a.bluetoothHfpClientGetConnectionStatus(
318        self.sec_ad_mac_address)
319
320  def is_a2dp_sink_connected(self) -> _CONNECTION_STATE:
321    """Checks if the pri_ad and sec_ad are hfp connected."""
322    self._verify_pri_ad()
323    if self.sec_ad is None:
324      raise MissingBtClientDeviceError('The sec_ad was not added')
325    return self.sl4a.bluetoothA2dpSinkGetConnectionStatus(
326        self.sec_ad_mac_address)
327
328  def last_number_dial(self) -> None:
329    """Redials last outgoing phone number."""
330    if not self.is_hfp_connected():
331      self.hfp_connect()
332    self.log.info('Redials last number from hf phone %s for %s',
333                  self.mac_address, self.sec_ad_mac_address)
334    self.sl4a.bluetoothHfpClientDial(self.sec_ad_mac_address, None)
335
336  def map_connect(self) -> None:
337    """Establishes the map connection between self.pri_ad and self.sec_ad."""
338    self._verify_pri_ad()
339    connected = self.pri_ad.map_connect(self.sec_ad)
340    asserts.assert_true(
341        connected, 'The map connection between {} and {} failed'.format(
342            self.serial, self.sec_ad.serial))
343    self.log.info('The map connection between %s and %s succeed', self.serial,
344                  self.sec_ad.serial)
345
346  def map_disconnect(self) -> None:
347    """Initiates a map disconnection to the connected device.
348
349    Raises:
350      BluetoothProfileConnectionError: raised if failed to disconnect.
351    """
352    self._verify_pri_ad()
353    if not self.pri_ad.map_disconnect(self.sec_ad_mac_address):
354      raise BluetoothProfileConnectionError(
355          'Failed to terminate the MAP connection with the device "%s".' %
356          self.sec_ad_mac_address)
357
358  def pbap_connect(self) -> None:
359    """Establishes the pbap connection between self.pri_ad and self.sec_ad."""
360    connected = self.pri_ad.pbap_connect(self.sec_ad)
361    asserts.assert_true(
362        connected, 'The pbap connection between {} and {} failed'.format(
363            self.serial, self.sec_ad.serial))
364    self.log.info('The pbap connection between %s and %s succeed', self.serial,
365                  self.sec_ad.serial)
366
367  def pause(self) -> None:
368    """Sends Avrcp pause command."""
369    self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PAUSE, self.sec_ad)
370
371  def play(self) -> None:
372    """Sends Avrcp play command."""
373    self.send_media_passthrough_cmd(bt_constants.CMD_MEDIA_PLAY, self.sec_ad)
374
375  def power_on(self) -> bool:
376    """Turns the Bluetooth on the android bt garget device."""
377    self.log.info('Turns on the bluetooth')
378    return self.sl4a.bluetoothToggleState(True)
379
380  def power_off(self) -> bool:
381    """Turns the Bluetooth off the android bt garget device."""
382    self.log.info('Turns off the bluetooth')
383    return self.sl4a.bluetoothToggleState(False)
384
385  def route_call_audio(self, connect: bool = False) -> None:
386    """Routes call audio during a call."""
387    if not self.is_hfp_connected():
388      self.hfp_connect()
389    self.log.info(
390        'Routes call audio during a call from hf phone %s for %s '
391        'audio connection %s after routing', self.mac_address,
392        self.sec_ad_mac_address, connect)
393    if connect:
394      self.sl4a.bluetoothHfpClientConnectAudio(self.sec_ad_mac_address)
395    else:
396      self.sl4a.bluetoothHfpClientDisconnectAudio(self.sec_ad_mac_address)
397
398  def reject_phone_call(self) -> bool:
399    """Rejects an incoming phone call."""
400    if not self.is_hfp_connected():
401      self.hfp_connect()
402    # Make sure the device is in ringing state.
403    if not self.wait_for_call_state(
404        bt_constants.CALL_STATE_RINGING, bt_constants.CALL_STATE_TIMEOUT_SEC):
405      raise signals.ControllerError(
406          'Timed out after %ds waiting for the device %s to be ringing state '
407          'before rejecting the incoming phone call.' %
408          (bt_constants.CALL_STATE_TIMEOUT_SEC, self.serial))
409    self.log.info('Rejects the incoming phone call from hf phone %s for %s',
410                  self.mac_address, self.sec_ad_mac_address)
411    return self.sl4a.bluetoothHfpClientRejectCall(self.sec_ad_mac_address)
412
413  def set_audio_params(self, audio_params: Optional[Dict[str, str]]) -> None:
414    """Sets audio params to the android_bt_target_device."""
415    self.audio_params = audio_params
416
417  def track_previous(self) -> None:
418    """Sends Avrcp skip prev command."""
419    self.send_media_passthrough_cmd(
420        bt_constants.CMD_MEDIA_SKIP_PREV, self.sec_ad)
421
422  def track_next(self) -> None:
423    """Sends Avrcp skip next command."""
424    self.send_media_passthrough_cmd(
425        bt_constants.CMD_MEDIA_SKIP_NEXT, self.sec_ad)
426
427  def start_audio_capture(self, duration_sec: int = 20) -> None:
428    """Starts the audio capture over adb.
429
430    Args:
431      duration_sec: int, Number of seconds to record audio, 20 secs as default.
432    """
433    if 'duration' in self.audio_params.keys():
434      duration_sec = self.audio_params['duration']
435    if not self.is_a2dp_sink_connected():
436      self.a2dp_sink_connect()
437    cmd = 'ap2f --usage 1 --start --duration {} --target {}'.format(
438        duration_sec, self.adb_path)
439    self.log.info('Starts capturing audio with adb shell command %s', cmd)
440    self.adb.shell(cmd)
441
442  def stop_audio_capture(self) -> str:
443    """Stops the audio capture and stores it in wave file.
444
445    Returns:
446      File name of the recorded file.
447
448    Raises:
449      MissingAudioParamsError: when self.audio_params is None
450    """
451    if self.audio_params is None:
452      raise MissingAudioParamsError('Missing audio params for capturing audio')
453    if not self.is_a2dp_sink_connected():
454      self.a2dp_sink_connect()
455    adb_pull_args = [self.adb_path, self.audio_capture_path]
456    self.log.info('start adb -s %s pull %s', self.serial, adb_pull_args)
457    self._ad.adb.pull(adb_pull_args)
458    pcm_file_path = os.path.join(self.audio_capture_path, ADB_FILE)
459    self.log.info('delete the recored file %s', self.adb_path)
460    self._ad.adb.shell('rm {}'.format(self.adb_path))
461    wave_file_path = self.get_new_wave_file_path()
462    self.log.info('convert pcm file %s to wav file %s', pcm_file_path,
463                  wave_file_path)
464    btutils.convert_pcm_to_wav(pcm_file_path, wave_file_path, self.audio_params)
465    return wave_file_path
466
467  def stop_all_services(self) -> None:
468    """Stops all services for the pri_ad device."""
469    self.log.info('Stops all services on the android bt target device')
470    self._ad.services.stop_all()
471
472  def stop_ambs_for_avrcp(self) -> None:
473    """Stops media browser service for avrcp."""
474    if self.is_avrcp_ready():
475      self.log.info('Stops avrcp connection')
476      self.sec_ad.sl4a.bluetoothMediaPhoneSL4AMBSStop()
477      self.avrcp_ready = False
478
479  def stop_voice_dial(self) -> None:
480    """Stops voice dial."""
481    if not self.is_hfp_connected():
482      self.hfp_connect()
483    self.log.info('Stops voice dial from hf phone %s for %s', self.mac_address,
484                  self.sec_ad_mac_address)
485    if self.is_hfp_connected():
486      self.sl4a.bluetoothHfpClientStopVoiceRecognition(
487          self.sec_ad_mac_address)
488
489  def take_bug_report(self,
490                      test_name: Optional[str] = None,
491                      begin_time: Optional[int] = None,
492                      timeout: float = 300,
493                      destination: Optional[str] = None) -> None:
494    """Wrapper method to capture bugreport on the android bt target device."""
495    self._ad.take_bug_report(test_name, begin_time, timeout, destination)
496
497  def voice_dial(self) -> None:
498    """Triggers voice dial."""
499    if not self.is_hfp_connected():
500      self.hfp_connect()
501    self.log.info('Triggers voice dial from hf phone %s for %s',
502                  self.mac_address, self.sec_ad_mac_address)
503    if self.is_hfp_connected():
504      self.sl4a.bluetoothHfpClientStartVoiceRecognition(
505          self.sec_ad_mac_address)
506
507  def log_type(self) -> str:
508    """Gets the log type of Android bt target device.
509
510    Returns:
511      A string, the log type of Android bt target device.
512    """
513    return bt_constants.LogType.BLUETOOTH_DEVICE_SIMULATOR.value
514
515
516class BluetoothProfileConnectionError(Exception):
517  """Error for Bluetooth Profile connection problems."""
518
519
520class MissingBtClientDeviceError(Exception):
521  """Error for missing required bluetooth client device."""
522
523
524class MissingAudioParamsError(Exception):
525  """Error for missing the audio params."""
526