1# Copyright 2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import avatar 17import grpc 18import logging 19 20from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices 21from bumble.smp import PairingDelegate 22from concurrent import futures 23from contextlib import suppress 24from mobly import base_test, signals, test_runner 25from mobly.asserts import assert_equal # type: ignore 26from mobly.asserts import assert_in # type: ignore 27from mobly.asserts import assert_is_none # type: ignore 28from mobly.asserts import assert_is_not_none # type: ignore 29from mobly.asserts import explicit_pass, fail # type: ignore 30from pandora.host_pb2 import ( 31 DISCOVERABLE_GENERAL, 32 DISCOVERABLE_LIMITED, 33 NOT_DISCOVERABLE, 34 PUBLIC, 35 RANDOM, 36 DataTypes, 37 DiscoverabilityMode, 38 OwnAddressType, 39) 40from pandora.security_pb2 import LE_LEVEL3, LEVEL2, PairingEventAnswer 41from typing import NoReturn, Optional 42 43 44class ExampleTest(base_test.BaseTestClass): # type: ignore[misc] 45 devices: Optional[PandoraDevices] = None 46 47 # pandora devices. 48 dut: PandoraDevice 49 ref: PandoraDevice 50 51 def setup_class(self) -> None: 52 self.devices = PandoraDevices(self) 53 self.dut, self.ref, *_ = self.devices 54 55 # Enable BR/EDR mode for Bumble devices. 56 for device in self.devices: 57 if isinstance(device, BumblePandoraDevice): 58 device.config.setdefault('classic_enabled', True) 59 60 def teardown_class(self) -> None: 61 if self.devices: 62 self.devices.stop_all() 63 64 @avatar.asynchronous 65 async def setup_test(self) -> None: # pytype: disable=wrong-arg-types 66 await asyncio.gather(self.dut.reset(), self.ref.reset()) 67 68 def test_print_addresses(self) -> None: 69 dut_address = self.dut.address 70 self.dut.log.info(f'Address: {dut_address}') 71 ref_address = self.ref.address 72 self.ref.log.info(f'Address: {ref_address}') 73 74 def test_classic_connect(self) -> None: 75 dut_address = self.dut.address 76 self.dut.log.info(f'Address: {dut_address}') 77 connection = self.ref.host.Connect(address=dut_address).connection 78 assert connection 79 self.ref.log.info(f'Connected with: {dut_address}') 80 self.ref.host.Disconnect(connection=connection) 81 82 # Using this decorator allow us to write one `test_le_connect`, and 83 # run it multiple time with different parameters. 84 # Here we check that no matter the address type we use for both sides 85 # the connection still complete. 86 @avatar.parameterized( 87 (RANDOM, RANDOM), 88 (RANDOM, PUBLIC), 89 ) # type: ignore[misc] 90 def test_le_connect(self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType) -> None: 91 if not isinstance(self.ref, BumblePandoraDevice): 92 raise signals.TestSkip('Test require Bumble as reference device') 93 94 advertisement = self.ref.host.Advertise(legacy=True, connectable=True, own_address_type=ref_address_type) 95 scan = self.dut.host.Scan(own_address_type=dut_address_type) 96 if ref_address_type == PUBLIC: 97 scan_response = next((x for x in scan if x.public == self.ref.address)) 98 dut_ref = self.dut.host.ConnectLE( 99 public=scan_response.public, 100 own_address_type=dut_address_type, 101 ).connection 102 else: 103 scan_response = next((x for x in scan if x.random == self.ref.random_address)) 104 dut_ref = self.dut.host.ConnectLE( 105 random=scan_response.random, 106 own_address_type=dut_address_type, 107 ).connection 108 scan.cancel() 109 ref_dut = next(advertisement).connection 110 advertisement.cancel() 111 assert dut_ref and ref_dut 112 self.dut.host.Disconnect(connection=dut_ref) 113 114 @avatar.rpc_except( 115 { 116 # This test should reach the `Inquiry` timeout. 117 grpc.StatusCode.DEADLINE_EXCEEDED: lambda e: explicit_pass(e.details()), 118 } 119 ) 120 def test_not_discoverable(self) -> None: 121 self.dut.host.SetDiscoverabilityMode(mode=NOT_DISCOVERABLE) 122 inquiry = self.ref.host.Inquiry(timeout=3.0) 123 try: 124 assert_is_none(next((x for x in inquiry if x.address == self.dut.address), None)) 125 finally: 126 inquiry.cancel() 127 128 @avatar.parameterized( 129 (DISCOVERABLE_LIMITED,), 130 (DISCOVERABLE_GENERAL,), 131 ) # type: ignore[misc] 132 def test_discoverable(self, mode: DiscoverabilityMode) -> None: 133 self.dut.host.SetDiscoverabilityMode(mode=mode) 134 inquiry = self.ref.host.Inquiry(timeout=15.0) 135 try: 136 assert_is_not_none(next((x for x in inquiry if x.address == self.dut.address), None)) 137 finally: 138 inquiry.cancel() 139 140 @avatar.asynchronous 141 async def test_wait_connection(self) -> None: # pytype: disable=wrong-arg-types 142 dut_ref_co = self.dut.aio.host.WaitConnection(address=self.ref.address) 143 ref_dut = await self.ref.aio.host.Connect(address=self.dut.address) 144 dut_ref = await dut_ref_co 145 assert_is_not_none(ref_dut.connection) 146 assert_is_not_none(dut_ref.connection) 147 assert ref_dut.connection 148 await self.ref.aio.host.Disconnect(connection=ref_dut.connection) 149 150 def test_scan_response_data(self) -> None: 151 advertisement = self.dut.host.Advertise( 152 legacy=True, 153 data=DataTypes( 154 complete_service_class_uuids16=['FDF0'], 155 ), 156 scan_response_data=DataTypes( 157 include_class_of_device=True, 158 ), 159 ) 160 161 scan = self.ref.host.Scan() 162 scan_response = next((x for x in scan if x.public == self.dut.address)) 163 164 scan.cancel() 165 advertisement.cancel() 166 167 assert_equal(type(scan_response.data.class_of_device), int) 168 assert_equal(type(scan_response.data.complete_service_class_uuids16[0]), str) 169 170 async def handle_pairing_events(self) -> NoReturn: 171 ref_pairing_stream = self.ref.aio.security.OnPairing() 172 dut_pairing_stream = self.dut.aio.security.OnPairing() 173 174 try: 175 while True: 176 ref_pairing_event, dut_pairing_event = await asyncio.gather( 177 anext(ref_pairing_stream), # pytype: disable=name-error 178 anext(dut_pairing_stream), # pytype: disable=name-error 179 ) 180 181 if dut_pairing_event.method_variant() in ('numeric_comparison', 'just_works'): 182 assert_in(ref_pairing_event.method_variant(), ('numeric_comparison', 'just_works')) 183 dut_pairing_stream.send_nowait( 184 PairingEventAnswer( 185 event=dut_pairing_event, 186 confirm=True, 187 ) 188 ) 189 ref_pairing_stream.send_nowait( 190 PairingEventAnswer( 191 event=ref_pairing_event, 192 confirm=True, 193 ) 194 ) 195 elif dut_pairing_event.method_variant() == 'passkey_entry_notification': 196 assert_equal(ref_pairing_event.method_variant(), 'passkey_entry_request') 197 ref_pairing_stream.send_nowait( 198 PairingEventAnswer( 199 event=ref_pairing_event, 200 passkey=dut_pairing_event.passkey_entry_notification, 201 ) 202 ) 203 elif dut_pairing_event.method_variant() == 'passkey_entry_request': 204 assert_equal(ref_pairing_event.method_variant(), 'passkey_entry_notification') 205 dut_pairing_stream.send_nowait( 206 PairingEventAnswer( 207 event=dut_pairing_event, 208 passkey=ref_pairing_event.passkey_entry_notification, 209 ) 210 ) 211 else: 212 fail("unreachable") 213 214 finally: 215 ref_pairing_stream.cancel() 216 dut_pairing_stream.cancel() 217 218 @avatar.parameterized( 219 (PairingDelegate.NO_OUTPUT_NO_INPUT,), 220 (PairingDelegate.KEYBOARD_INPUT_ONLY,), 221 (PairingDelegate.DISPLAY_OUTPUT_ONLY,), 222 (PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,), 223 (PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,), 224 ) # type: ignore[misc] 225 @avatar.asynchronous 226 async def test_classic_pairing(self, ref_io_capability: int) -> None: # pytype: disable=wrong-arg-types 227 if not isinstance(self.ref, BumblePandoraDevice): 228 raise signals.TestSkip('Test require Bumble as reference device(s)') 229 230 # override reference device IO capability 231 setattr(self.ref.device, 'io_capability', ref_io_capability) 232 233 pairing = asyncio.create_task(self.handle_pairing_events()) 234 (dut_ref_res, ref_dut_res) = await asyncio.gather( 235 self.dut.aio.host.WaitConnection(address=self.ref.address), 236 self.ref.aio.host.Connect(address=self.dut.address), 237 ) 238 239 assert_equal(ref_dut_res.result_variant(), 'connection') 240 assert_equal(dut_ref_res.result_variant(), 'connection') 241 ref_dut = ref_dut_res.connection 242 dut_ref = dut_ref_res.connection 243 assert ref_dut and dut_ref 244 245 (secure, wait_security) = await asyncio.gather( 246 self.ref.aio.security.Secure(connection=ref_dut, classic=LEVEL2), 247 self.dut.aio.security.WaitSecurity(connection=dut_ref, classic=LEVEL2), 248 ) 249 250 pairing.cancel() 251 with suppress(asyncio.CancelledError, futures.CancelledError): 252 await pairing 253 254 assert_equal(secure.result_variant(), 'success') 255 assert_equal(wait_security.result_variant(), 'success') 256 257 await asyncio.gather( 258 self.dut.aio.host.Disconnect(connection=dut_ref), 259 self.ref.aio.host.WaitDisconnection(connection=ref_dut), 260 ) 261 262 @avatar.parameterized( 263 (RANDOM, RANDOM, PairingDelegate.NO_OUTPUT_NO_INPUT), 264 (RANDOM, RANDOM, PairingDelegate.KEYBOARD_INPUT_ONLY), 265 (RANDOM, RANDOM, PairingDelegate.DISPLAY_OUTPUT_ONLY), 266 (RANDOM, RANDOM, PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT), 267 (RANDOM, RANDOM, PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT), 268 (RANDOM, PUBLIC, PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT), 269 ) # type: ignore[misc] 270 @avatar.asynchronous 271 async def test_le_pairing( # pytype: disable=wrong-arg-types 272 self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, ref_io_capability: int 273 ) -> None: 274 if not isinstance(self.ref, BumblePandoraDevice): 275 raise signals.TestSkip('Test require Bumble as reference device(s)') 276 277 # override reference device IO capability 278 setattr(self.ref.device, 'io_capability', ref_io_capability) 279 280 advertisement = self.dut.aio.host.Advertise( 281 legacy=True, 282 connectable=True, 283 own_address_type=dut_address_type, 284 data=DataTypes(manufacturer_specific_data=b'pause cafe'), 285 ) 286 287 scan = self.ref.aio.host.Scan(own_address_type=ref_address_type) 288 dut = await anext( 289 (x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data) 290 ) # pytype: disable=name-error 291 scan.cancel() 292 assert dut 293 294 pairing = asyncio.create_task(self.handle_pairing_events()) 295 (ref_dut_res, dut_ref_res) = await asyncio.gather( 296 self.ref.aio.host.ConnectLE(own_address_type=ref_address_type, **dut.address_asdict()), 297 anext(aiter(advertisement)), # pytype: disable=name-error 298 ) 299 300 advertisement.cancel() 301 ref_dut, dut_ref = ref_dut_res.connection, dut_ref_res.connection 302 assert ref_dut and dut_ref 303 304 (secure, wait_security) = await asyncio.gather( 305 self.ref.aio.security.Secure(connection=ref_dut, le=LE_LEVEL3), 306 self.dut.aio.security.WaitSecurity(connection=dut_ref, le=LE_LEVEL3), 307 ) 308 309 pairing.cancel() 310 with suppress(asyncio.CancelledError, futures.CancelledError): 311 await pairing 312 313 assert_equal(secure.result_variant(), 'success') 314 assert_equal(wait_security.result_variant(), 'success') 315 316 await asyncio.gather( 317 self.dut.aio.host.Disconnect(connection=dut_ref), 318 self.ref.aio.host.WaitDisconnection(connection=ref_dut), 319 ) 320 321 322if __name__ == '__main__': 323 logging.basicConfig(level=logging.DEBUG) 324 test_runner.main() # type: ignore 325