1# Copyright (C) 2022 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Fast Pair provider simulator role.""" 16 17import time 18 19from mobly import asserts 20from mobly.controllers import android_device 21from mobly.controllers.android_device_lib import jsonrpc_client_base 22from mobly.controllers.android_device_lib import snippet_event 23from typing import Optional 24 25from test_helper import event_helper 26 27# The package name of the provider simulator snippet. 28FP_PROVIDER_SIMULATOR_SNIPPETS_PACKAGE = 'android.nearby.multidevices' 29 30# Events reported from the provider simulator snippet. 31ON_A2DP_SINK_PROFILE_CONNECT_EVENT = 'onA2DPSinkProfileConnected' 32ON_SCAN_MODE_CHANGE_EVENT = 'onScanModeChange' 33ON_ADVERTISING_CHANGE_EVENT = 'onAdvertisingChange' 34 35# Target scan mode. 36DISCOVERABLE_MODE = 'DISCOVERABLE' 37 38# Abbreviations for common use type. 39AndroidDevice = android_device.AndroidDevice 40SnippetEvent = snippet_event.SnippetEvent 41wait_for_event = event_helper.wait_callback_event 42 43 44class FastPairProviderSimulator: 45 """A proxy for provider simulator snippet on the device.""" 46 47 def __init__(self, ad: AndroidDevice) -> None: 48 self._ad = ad 49 self._ad.debug_tag = 'FastPairProviderSimulator' 50 self._provider_status_callback = None 51 52 def load_snippet(self) -> None: 53 """Starts the provider simulator snippet and connects. 54 55 Raises: 56 SnippetError: Illegal load operations are attempted. 57 """ 58 self._ad.load_snippet( 59 name='fp', package=FP_PROVIDER_SIMULATOR_SNIPPETS_PACKAGE) 60 61 def setup_provider_simulator(self, timeout_seconds: int) -> None: 62 """Sets up the Fast Pair provider simulator. 63 64 Args: 65 timeout_seconds: The number of seconds to wait before giving up. 66 """ 67 setup_status_callback = self._ad.fp.setupProviderSimulator() 68 69 def _on_a2dp_sink_profile_connect_event_received(_, elapsed_time: int) -> bool: 70 self._ad.log.info('Provider simulator connected to A2DP sink in %d seconds.', 71 elapsed_time) 72 return True 73 74 def _on_a2dp_sink_profile_connect_event_waiting(elapsed_time: int) -> None: 75 self._ad.log.info( 76 'Still waiting "%s" event callback from provider side ' 77 'after %d seconds...', ON_A2DP_SINK_PROFILE_CONNECT_EVENT, elapsed_time) 78 79 def _on_a2dp_sink_profile_connect_event_missed() -> None: 80 asserts.fail(f'Timed out after {timeout_seconds} seconds waiting for ' 81 f'the specific "{ON_A2DP_SINK_PROFILE_CONNECT_EVENT}" event.') 82 83 wait_for_event( 84 callback_event_handler=setup_status_callback, 85 event_name=ON_A2DP_SINK_PROFILE_CONNECT_EVENT, 86 timeout_seconds=timeout_seconds, 87 on_received=_on_a2dp_sink_profile_connect_event_received, 88 on_waiting=_on_a2dp_sink_profile_connect_event_waiting, 89 on_missed=_on_a2dp_sink_profile_connect_event_missed) 90 91 def start_model_id_advertising(self, model_id: str, anti_spoofing_key: str) -> None: 92 """Starts model id advertising for scanning and initial pairing. 93 94 Args: 95 model_id: A 3-byte hex string for seeker side to recognize the device (ex: 96 0x00000C). 97 anti_spoofing_key: A public key for registered headsets. 98 """ 99 self._ad.log.info( 100 'Provider simulator starts advertising as model id "%s" with anti-spoofing key "%s".', 101 model_id, anti_spoofing_key) 102 self._provider_status_callback = ( 103 self._ad.fp.startModelIdAdvertising(model_id, anti_spoofing_key)) 104 105 def teardown_provider_simulator(self) -> None: 106 """Tears down the Fast Pair provider simulator.""" 107 self._ad.fp.teardownProviderSimulator() 108 109 def get_ble_mac_address(self) -> str: 110 """Gets Bluetooth low energy mac address of the provider simulator. 111 112 The BLE mac address will be set by the AdvertisingSet.getOwnAddress() 113 callback. This is the callback flow in the custom Android build. It takes 114 a while after advertising started so we use retry here to wait it. 115 116 Returns: 117 The BLE mac address of the Fast Pair provider simulator. 118 """ 119 for _ in range(3): 120 try: 121 return self._ad.fp.getBluetoothLeAddress() 122 except jsonrpc_client_base.ApiError: 123 time.sleep(1) 124 125 def wait_for_discoverable_mode(self, timeout_seconds: int) -> None: 126 """Waits onScanModeChange event to ensure provider is discoverable. 127 128 Args: 129 timeout_seconds: The number of seconds to wait before giving up. 130 """ 131 132 def _on_scan_mode_change_event_received( 133 scan_mode_change_event: SnippetEvent, elapsed_time: int) -> bool: 134 scan_mode = scan_mode_change_event.data['mode'] 135 self._ad.log.info( 136 'Provider simulator changed the scan mode to %s in %d seconds.', 137 scan_mode, elapsed_time) 138 return scan_mode == DISCOVERABLE_MODE 139 140 def _on_scan_mode_change_event_waiting(elapsed_time: int) -> None: 141 self._ad.log.info( 142 'Still waiting "%s" event callback from provider side ' 143 'after %d seconds...', ON_SCAN_MODE_CHANGE_EVENT, elapsed_time) 144 145 def _on_scan_mode_change_event_missed() -> None: 146 asserts.fail(f'Timed out after {timeout_seconds} seconds waiting for ' 147 f'the specific "{ON_SCAN_MODE_CHANGE_EVENT}" event.') 148 149 wait_for_event( 150 callback_event_handler=self._provider_status_callback, 151 event_name=ON_SCAN_MODE_CHANGE_EVENT, 152 timeout_seconds=timeout_seconds, 153 on_received=_on_scan_mode_change_event_received, 154 on_waiting=_on_scan_mode_change_event_waiting, 155 on_missed=_on_scan_mode_change_event_missed) 156 157 def wait_for_advertising_start(self, timeout_seconds: int) -> None: 158 """Waits onAdvertisingChange event to ensure provider is advertising. 159 160 Args: 161 timeout_seconds: The number of seconds to wait before giving up. 162 """ 163 164 def _on_advertising_mode_change_event_received( 165 scan_mode_change_event: SnippetEvent, elapsed_time: int) -> bool: 166 advertising_mode = scan_mode_change_event.data['isAdvertising'] 167 self._ad.log.info( 168 'Provider simulator changed the advertising mode to %s in %d seconds.', 169 advertising_mode, elapsed_time) 170 return advertising_mode 171 172 def _on_advertising_mode_change_event_waiting(elapsed_time: int) -> None: 173 self._ad.log.info( 174 'Still waiting "%s" event callback from provider side ' 175 'after %d seconds...', ON_ADVERTISING_CHANGE_EVENT, elapsed_time) 176 177 def _on_advertising_mode_change_event_missed() -> None: 178 asserts.fail(f'Timed out after {timeout_seconds} seconds waiting for ' 179 f'the specific "{ON_ADVERTISING_CHANGE_EVENT}" event.') 180 181 wait_for_event( 182 callback_event_handler=self._provider_status_callback, 183 event_name=ON_ADVERTISING_CHANGE_EVENT, 184 timeout_seconds=timeout_seconds, 185 on_received=_on_advertising_mode_change_event_received, 186 on_waiting=_on_advertising_mode_change_event_waiting, 187 on_missed=_on_advertising_mode_change_event_missed) 188 189 def get_latest_received_account_key(self) -> Optional[str]: 190 """Gets the latest account key received on the provider side. 191 192 Returns: 193 The account key received at provider side. 194 """ 195 return self._ad.fp.getLatestReceivedAccountKey() 196