1# Copyright 2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import avatar 17import enum 18import grpc 19import logging 20 21from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, asynchronous, bumble_server 22from bumble.gatt import GATT_ASHA_SERVICE 23from bumble.pairing import PairingDelegate 24from bumble_experimental.asha import AshaGattService, AshaService 25from mobly import base_test, signals, test_runner 26from mobly.asserts import assert_equal # type: ignore 27from mobly.asserts import assert_false # type: ignore 28from mobly.asserts import assert_in # type: ignore 29from mobly.asserts import assert_is_not_none # type: ignore 30from mobly.asserts import assert_true # type: ignore 31from pandora._utils import AioStream 32from pandora.host_pb2 import PUBLIC, RANDOM, AdvertiseResponse, Connection, DataTypes, OwnAddressType, ScanningResponse 33from pandora.security_pb2 import LE_LEVEL3 34from pandora_experimental.asha_grpc_aio import Asha as AioAsha, add_AshaServicer_to_server 35from typing import List, Optional, Tuple 36 37ASHA_UUID = GATT_ASHA_SERVICE.to_hex_str('-') 38HISYCNID: List[int] = [0x01, 0x02, 0x03, 0x04, 0x5, 0x6, 0x7, 0x8] 39COMPLETE_LOCAL_NAME: str = "Bumble" 40AUDIO_SIGNAL_AMPLITUDE = 0.8 41AUDIO_SIGNAL_SAMPLING_RATE = 44100 42 43 44class Ear(enum.IntEnum): 45 """Reference devices type""" 46 47 LEFT = 0 48 RIGHT = 1 49 50 51class ASHATest(base_test.BaseTestClass): # type: ignore[misc] 52 devices: Optional[PandoraDevices] = None 53 54 # pandora devices. 55 dut: PandoraDevice 56 ref_left: BumblePandoraDevice 57 ref_right: BumblePandoraDevice 58 59 def setup_class(self) -> None: 60 # Register experimental bumble servicers hook. 61 bumble_server.register_servicer_hook( 62 lambda bumble, _, server: add_AshaServicer_to_server(AshaService(bumble.device), server) 63 ) 64 65 self.devices = PandoraDevices(self) 66 self.dut, ref_left, ref_right, *_ = self.devices 67 68 if isinstance(self.dut, BumblePandoraDevice): 69 raise signals.TestAbortClass('DUT Bumble does not support Asha source') 70 if not isinstance(ref_left, BumblePandoraDevice): 71 raise signals.TestAbortClass('Test require Bumble as reference device(s)') 72 if not isinstance(ref_right, BumblePandoraDevice): 73 raise signals.TestAbortClass('Test require Bumble as reference device(s)') 74 75 self.ref_left, self.ref_right = ref_left, ref_right 76 77 def teardown_class(self) -> None: 78 if self.devices: 79 self.devices.stop_all() 80 81 @avatar.asynchronous 82 async def setup_test(self) -> None: 83 await asyncio.gather(self.dut.reset(), self.ref_left.reset(), self.ref_right.reset()) 84 85 # ASHA hearing aid's IO capability is NO_OUTPUT_NO_INPUT 86 self.ref_left.server_config.io_capability = PairingDelegate.NO_OUTPUT_NO_INPUT 87 self.ref_right.server_config.io_capability = PairingDelegate.NO_OUTPUT_NO_INPUT 88 89 async def ref_advertise_asha( 90 self, ref_device: PandoraDevice, ref_address_type: OwnAddressType, ear: Ear 91 ) -> AioStream[AdvertiseResponse]: 92 """ 93 Ref device starts to advertise with service data in advertisement data. 94 :return: Ref device's advertise stream 95 """ 96 # Ref starts advertising with ASHA service data 97 asha = AioAsha(ref_device.aio.channel) 98 await asha.Register(capability=ear, hisyncid=HISYCNID) 99 return ref_device.aio.host.Advertise( 100 legacy=True, 101 connectable=True, 102 own_address_type=ref_address_type, 103 data=DataTypes( 104 complete_local_name=COMPLETE_LOCAL_NAME, 105 incomplete_service_class_uuids16=[ASHA_UUID], 106 ), 107 ) 108 109 async def dut_scan_for_asha(self, dut_address_type: OwnAddressType, ear: Ear) -> ScanningResponse: 110 """ 111 DUT starts to scan for the Ref device. 112 :return: ScanningResponse for ASHA 113 """ 114 dut_scan = self.dut.aio.host.Scan(own_address_type=dut_address_type) 115 expected_advertisement_data = self.get_expected_advertisement_data(ear) 116 ref = await anext( 117 ( 118 x 119 async for x in dut_scan 120 if ( 121 ASHA_UUID in x.data.incomplete_service_class_uuids16 122 and expected_advertisement_data == (x.data.service_data_uuid16[ASHA_UUID]).hex() 123 ) 124 ) 125 ) 126 dut_scan.cancel() 127 return ref 128 129 async def dut_connect_to_ref( 130 self, advertisement: AioStream[AdvertiseResponse], ref: ScanningResponse, dut_address_type: OwnAddressType 131 ) -> Tuple[Connection, Connection]: 132 """ 133 Helper method for Dut connects to Ref 134 :return: a Tuple (DUT to REF connection, REF to DUT connection) 135 """ 136 (dut_ref_res, ref_dut_res) = await asyncio.gather( 137 self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()), 138 anext(aiter(advertisement)), # pytype: disable=name-error 139 ) 140 assert_equal(dut_ref_res.result_variant(), 'connection') 141 dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection 142 assert_is_not_none(dut_ref) 143 assert dut_ref 144 advertisement.cancel() 145 return dut_ref, ref_dut 146 147 async def is_device_connected(self, device: PandoraDevice, connection: Connection, timeout: float) -> bool: 148 try: 149 await device.aio.host.WaitDisconnection(connection=connection, timeout=timeout) 150 return False 151 except grpc.RpcError as e: 152 assert_equal(e.code(), grpc.StatusCode.DEADLINE_EXCEEDED) # type: ignore 153 return True 154 155 def get_expected_advertisement_data(self, ear: Ear) -> str: 156 protocol_version = 0x01 157 truncated_hisyncid = HISYCNID[:4] 158 return ( 159 "{:02x}".format(protocol_version) 160 + "{:02x}".format(ear) 161 + "".join([("{:02x}".format(x)) for x in truncated_hisyncid]) 162 ) 163 164 def get_le_psm_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[int]: 165 asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService))) 166 le_psm_future = asyncio.get_running_loop().create_future() 167 168 def le_psm_handler(connection: Connection, data: int) -> None: 169 le_psm_future.set_result(data) 170 171 asha_service.on('le_psm_out', le_psm_handler) 172 return le_psm_future 173 174 def get_read_only_properties_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[bytes]: 175 asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService))) 176 read_only_properties_future = asyncio.get_running_loop().create_future() 177 178 def read_only_properties_handler(connection: Connection, data: bytes) -> None: 179 read_only_properties_future.set_result(data) 180 181 asha_service.on('read_only_properties', read_only_properties_handler) 182 return read_only_properties_future 183 184 def get_start_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[dict[str, int]]: 185 asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService))) 186 start_future = asyncio.get_running_loop().create_future() 187 188 def start_command_handler(connection: Connection, data: dict[str, int]) -> None: 189 start_future.set_result(data) 190 191 asha_service.on('start', start_command_handler) 192 return start_future 193 194 def get_stop_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[Connection]: 195 asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService))) 196 stop_future = asyncio.get_running_loop().create_future() 197 198 def stop_command_handler(connection: Connection) -> None: 199 stop_future.set_result(connection) 200 201 asha_service.on('stop', stop_command_handler) 202 return stop_future 203 204 @avatar.parameterized( 205 (RANDOM, Ear.LEFT), 206 (RANDOM, Ear.RIGHT), 207 ) # type: ignore[misc] 208 @asynchronous 209 async def test_advertising_advertisement_data( 210 self, 211 ref_address_type: OwnAddressType, 212 ear: Ear, 213 ) -> None: 214 """ 215 Ref starts ASHA advertisements with service data in advertisement data. 216 DUT starts a service discovery. 217 Verify Ref is correctly discovered by DUT as a hearing aid device. 218 """ 219 advertisement = await self.ref_advertise_asha(self.ref_left, ref_address_type, ear) 220 221 # DUT starts a service discovery 222 scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear) 223 advertisement.cancel() 224 225 # Verify Ref is correctly discovered by DUT as a hearing aid device 226 assert_in(ASHA_UUID, scan_result.data.service_data_uuid16) 227 assert_equal(type(scan_result.data.complete_local_name), str) 228 expected_advertisement_data = self.get_expected_advertisement_data(ear) 229 assert_equal( 230 expected_advertisement_data, 231 (scan_result.data.service_data_uuid16[ASHA_UUID]).hex(), 232 ) 233 234 @asynchronous 235 async def test_advertising_scan_response(self) -> None: 236 """ 237 Ref starts ASHA advertisements with service data in scan response data. 238 DUT starts a service discovery. 239 Verify Ref is correctly discovered by DUT as a hearing aid device. 240 """ 241 asha = AioAsha(self.ref_left.aio.channel) 242 await asha.Register(capability=Ear.LEFT, hisyncid=HISYCNID) 243 244 # advertise with ASHA service data in scan response 245 advertisement = self.ref_left.aio.host.Advertise( 246 legacy=True, 247 scan_response_data=DataTypes( 248 complete_local_name=COMPLETE_LOCAL_NAME, 249 complete_service_class_uuids16=[ASHA_UUID], 250 ), 251 ) 252 253 scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=Ear.LEFT) 254 advertisement.cancel() 255 256 # Verify Ref is correctly discovered by DUT as a hearing aid device. 257 assert_in(ASHA_UUID, scan_result.data.service_data_uuid16) 258 expected_advertisement_data = self.get_expected_advertisement_data(Ear.LEFT) 259 assert_equal( 260 expected_advertisement_data, 261 (scan_result.data.service_data_uuid16[ASHA_UUID]).hex(), 262 ) 263 264 @avatar.parameterized( 265 (RANDOM, PUBLIC), 266 (RANDOM, RANDOM), 267 ) # type: ignore[misc] 268 @asynchronous 269 async def test_pairing( 270 self, 271 dut_address_type: OwnAddressType, 272 ref_address_type: OwnAddressType, 273 ) -> None: 274 """ 275 DUT discovers Ref. 276 DUT initiates connection to Ref. 277 Verify that DUT and Ref are bonded and connected. 278 """ 279 advertisement = await self.ref_advertise_asha( 280 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 281 ) 282 283 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 284 285 # DUT initiates connection to Ref. 286 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) 287 288 # DUT starts pairing with the Ref. 289 (secure, wait_security) = await asyncio.gather( 290 self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3), 291 self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3), 292 ) 293 294 assert_equal(secure.result_variant(), 'success') 295 assert_equal(wait_security.result_variant(), 'success') 296 297 @avatar.parameterized( 298 (RANDOM, PUBLIC), 299 (RANDOM, RANDOM), 300 ) # type: ignore[misc] 301 @asynchronous 302 async def test_pairing_dual_device( 303 self, 304 dut_address_type: OwnAddressType, 305 ref_address_type: OwnAddressType, 306 ) -> None: 307 """ 308 DUT discovers Ref. 309 DUT initiates connection to Ref. 310 Verify that DUT and Ref are bonded and connected. 311 """ 312 313 async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]: 314 advertisement = await self.ref_advertise_asha( 315 ref_device=ref_device, ref_address_type=ref_address_type, ear=ear 316 ) 317 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=ear) 318 # DUT initiates connection to ref_device. 319 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) 320 advertisement.cancel() 321 322 return dut_ref, ref_dut 323 324 ((dut_ref_left, ref_left_dut), (dut_ref_right, ref_right_dut)) = await asyncio.gather( 325 ref_device_connect(self.ref_left, Ear.LEFT), ref_device_connect(self.ref_right, Ear.RIGHT) 326 ) 327 328 # DUT starts pairing with the ref_left 329 (secure_left, wait_security_left) = await asyncio.gather( 330 self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3), 331 self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3), 332 ) 333 334 assert_equal(secure_left.result_variant(), 'success') 335 assert_equal(wait_security_left.result_variant(), 'success') 336 337 # DUT starts pairing with the ref_right 338 (secure_right, wait_security_right) = await asyncio.gather( 339 self.dut.aio.security.Secure(connection=dut_ref_right, le=LE_LEVEL3), 340 self.ref_right.aio.security.WaitSecurity(connection=ref_right_dut, le=LE_LEVEL3), 341 ) 342 343 assert_equal(secure_right.result_variant(), 'success') 344 assert_equal(wait_security_right.result_variant(), 'success') 345 346 await asyncio.gather( 347 self.ref_left.aio.host.Disconnect(connection=ref_left_dut), 348 self.dut.aio.host.WaitDisconnection(connection=dut_ref_left), 349 ) 350 await asyncio.gather( 351 self.ref_right.aio.host.Disconnect(connection=ref_right_dut), 352 self.dut.aio.host.WaitDisconnection(connection=dut_ref_right), 353 ) 354 355 @avatar.parameterized( 356 (RANDOM, PUBLIC), 357 (RANDOM, RANDOM), 358 ) # type: ignore[misc] 359 @asynchronous 360 async def test_unbonding( 361 self, 362 dut_address_type: OwnAddressType, 363 ref_address_type: OwnAddressType, 364 ) -> None: 365 """ 366 DUT removes bond with Ref. 367 Verify that DUT and Ref are disconnected and unbonded. 368 """ 369 raise signals.TestSkip("TODO: update rootcanal to retry") 370 371 advertisement = await self.ref_advertise_asha( 372 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 373 ) 374 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 375 376 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) 377 378 secure = self.dut.security.Secure(connection=dut_ref, le=LE_LEVEL3) 379 380 assert_equal(secure.WhichOneof("result"), "success") 381 await self.dut.aio.host.Disconnect(dut_ref) 382 await self.ref_left.aio.host.WaitDisconnection(ref_dut) 383 384 # delete the bond 385 await self.dut.aio.security_storage.DeleteBond(random=self.ref_left.random_address) 386 387 # DUT connect to REF again 388 dut_ref = ( 389 await self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()) 390 ).connection 391 # TODO very likely there is a bug in android here 392 logging.debug("result should come out") 393 394 advertisement.cancel() 395 assert_is_not_none(dut_ref) 396 397 secure = await self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3) 398 399 assert_equal(secure.WhichOneof("result"), "success") 400 401 @avatar.parameterized( 402 (RANDOM, RANDOM), 403 (RANDOM, PUBLIC), 404 ) # type: ignore[misc] 405 @asynchronous 406 async def test_connection(self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType) -> None: 407 """ 408 DUT discovers Ref. 409 DUT initiates connection to Ref. 410 Verify that DUT and Ref are connected. 411 """ 412 advertisement = await self.ref_advertise_asha( 413 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 414 ) 415 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 416 417 _, _ = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) 418 419 @avatar.parameterized( 420 (RANDOM, RANDOM), 421 (RANDOM, PUBLIC), 422 ) # type: ignore[misc] 423 @asynchronous 424 async def test_disconnect_initiator( 425 self, 426 dut_address_type: OwnAddressType, 427 ref_address_type: OwnAddressType, 428 ) -> None: 429 """ 430 DUT initiates disconnection to Ref. 431 Verify that DUT and Ref are disconnected. 432 """ 433 advertisement = await self.ref_advertise_asha( 434 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 435 ) 436 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 437 438 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) 439 advertisement.cancel() 440 441 await self.dut.aio.host.Disconnect(connection=dut_ref) 442 assert_false(await self.is_device_connected(self.ref_left, ref_dut, 5), "Should be disconnected") 443 444 @avatar.parameterized( 445 (RANDOM, RANDOM), 446 (RANDOM, PUBLIC), 447 ) # type: ignore[misc] 448 @asynchronous 449 async def test_disconnect_initiator_dual_device( 450 self, 451 dut_address_type: OwnAddressType, 452 ref_address_type: OwnAddressType, 453 ) -> None: 454 """ 455 DUT initiates disconnection to Ref. 456 Verify that DUT and Ref are disconnected. 457 """ 458 459 async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]: 460 advertisement = await self.ref_advertise_asha( 461 ref_device=ref_device, ref_address_type=ref_address_type, ear=ear 462 ) 463 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=ear) 464 # DUT initiates connection to ref_device. 465 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) 466 advertisement.cancel() 467 468 return dut_ref, ref_dut 469 470 ((dut_ref_left, ref_left_dut), (dut_ref_right, ref_right_dut)) = await asyncio.gather( 471 ref_device_connect(self.ref_left, Ear.LEFT), ref_device_connect(self.ref_right, Ear.RIGHT) 472 ) 473 474 # Disconnect from DUT 475 await asyncio.gather( 476 self.dut.aio.host.Disconnect(connection=dut_ref_left), 477 self.dut.aio.host.Disconnect(connection=dut_ref_right), 478 ) 479 480 # Verify the Refs are disconnected 481 assert_false(await self.is_device_connected(self.ref_left, ref_left_dut, 5), "Should be disconnected") 482 assert_false(await self.is_device_connected(self.ref_right, ref_right_dut, 5), "Should be disconnected") 483 484 @avatar.parameterized( 485 (RANDOM, RANDOM), 486 (RANDOM, PUBLIC), 487 ) # type: ignore[misc] 488 @asynchronous 489 async def test_disconnect_acceptor( 490 self, 491 dut_address_type: OwnAddressType, 492 ref_address_type: OwnAddressType, 493 ) -> None: 494 """ 495 Ref initiates disconnection to DUT (typically when put back in its box). 496 Verify that Ref is disconnected. 497 """ 498 advertisement = await self.ref_advertise_asha( 499 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 500 ) 501 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 502 503 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) 504 advertisement.cancel() 505 506 await self.ref_left.aio.host.Disconnect(connection=ref_dut) 507 assert_false(await self.is_device_connected(self.dut, dut_ref, 5), "Should be disconnected") 508 509 @avatar.parameterized( 510 (RANDOM, RANDOM, 0), 511 (RANDOM, RANDOM, 0.5), 512 (RANDOM, RANDOM, 1), 513 (RANDOM, RANDOM, 5), 514 ) # type: ignore[misc] 515 @asynchronous 516 async def test_reconnection( 517 self, 518 dut_address_type: OwnAddressType, 519 ref_address_type: OwnAddressType, 520 reconnection_gap: float, 521 ) -> None: 522 """ 523 DUT initiates disconnection to the Ref. 524 Verify that DUT and Ref are disconnected. 525 DUT reconnects to Ref after various certain time. 526 Verify that DUT and Ref are connected. 527 """ 528 529 async def connect_and_disconnect() -> None: 530 advertisement = await self.ref_advertise_asha( 531 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 532 ) 533 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 534 dut_ref, _ = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) 535 await self.dut.aio.host.Disconnect(connection=dut_ref) 536 537 await connect_and_disconnect() 538 # simulating reconnect interval 539 await asyncio.sleep(reconnection_gap) 540 await connect_and_disconnect() 541 542 @avatar.parameterized( 543 (RANDOM, RANDOM), 544 (RANDOM, PUBLIC), 545 ) # type: ignore[misc] 546 @asynchronous 547 async def test_auto_connection( 548 self, 549 dut_address_type: OwnAddressType, 550 ref_address_type: OwnAddressType, 551 ) -> None: 552 """ 553 Ref initiates disconnection to DUT. 554 Ref starts sending ASHA advertisements. 555 Verify that DUT auto-connects to Ref. 556 """ 557 advertisement = await self.ref_advertise_asha( 558 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 559 ) 560 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 561 562 # manually connect and not cancel advertisement 563 dut_ref_res, ref_dut_res = await asyncio.gather( 564 self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()), 565 anext(aiter(advertisement)), # pytype: disable=name-error 566 ) 567 assert_equal(dut_ref_res.result_variant(), 'connection') 568 dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection 569 assert_is_not_none(dut_ref) 570 assert dut_ref 571 572 # Pairing 573 (secure, wait_security) = await asyncio.gather( 574 self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3), 575 self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3), 576 ) 577 assert_equal(secure.result_variant(), 'success') 578 assert_equal(wait_security.result_variant(), 'success') 579 580 await self.ref_left.aio.host.Disconnect(connection=ref_dut) 581 582 ref_dut = (await anext(aiter(advertisement))).connection 583 advertisement.cancel() 584 585 @avatar.parameterized( 586 (RANDOM, RANDOM, Ear.LEFT), 587 (RANDOM, PUBLIC, Ear.RIGHT), 588 ) # type: ignore[misc] 589 @asynchronous 590 async def test_disconnect_acceptor_dual_device( 591 self, 592 dut_address_type: OwnAddressType, 593 ref_address_type: OwnAddressType, 594 disconnect_device: Ear, 595 ) -> None: 596 """ 597 Prerequisites: DUT and Ref are connected and bonded. 598 Description: 599 1. One peripheral of Ref initiates disconnection to DUT. 600 2. Verify that it is disconnected and that the other peripheral is still connected. 601 """ 602 603 advertisement_left = await self.ref_advertise_asha( 604 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 605 ) 606 ref_left = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 607 _, ref_left_dut = await self.dut_connect_to_ref( 608 advertisement=advertisement_left, ref=ref_left, dut_address_type=dut_address_type 609 ) 610 advertisement_left.cancel() 611 612 advertisement_right = await self.ref_advertise_asha( 613 ref_device=self.ref_right, ref_address_type=ref_address_type, ear=Ear.RIGHT 614 ) 615 ref_right = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.RIGHT) 616 _, ref_right_dut = await self.dut_connect_to_ref( 617 advertisement=advertisement_right, ref=ref_right, dut_address_type=dut_address_type 618 ) 619 advertisement_right.cancel() 620 621 if disconnect_device == Ear.LEFT: 622 await self.ref_left.aio.host.Disconnect(connection=ref_left_dut) 623 assert_true(await self.is_device_connected(self.ref_right, ref_right_dut, 5), "Should be disconnected") 624 assert_false(await self.is_device_connected(self.ref_left, ref_left_dut, 5), "Should be disconnected") 625 else: 626 await self.ref_right.aio.host.Disconnect(connection=ref_right_dut) 627 assert_false(await self.is_device_connected(self.ref_right, ref_right_dut, 5), "Should be disconnected") 628 assert_true(await self.is_device_connected(self.ref_left, ref_left_dut, 5), "Should be disconnected") 629 630 @avatar.parameterized( 631 (RANDOM, RANDOM, Ear.LEFT), 632 (RANDOM, RANDOM, Ear.RIGHT), 633 (RANDOM, PUBLIC, Ear.LEFT), 634 (RANDOM, PUBLIC, Ear.RIGHT), 635 ) # type: ignore[misc] 636 @asynchronous 637 async def test_auto_connection_dual_device( 638 self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, tested_device: Ear 639 ) -> None: 640 """ 641 Prerequisites: DUT and Ref are connected and bonded. Ref is a dual device. 642 Description: 643 1. One peripheral of Ref initiates disconnection to DUT. 644 2. The disconnected peripheral starts sending ASHA advertisements. 645 3. Verify that DUT auto-connects to the peripheral. 646 """ 647 648 advertisement_left = await self.ref_advertise_asha( 649 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 650 ) 651 ref_left = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 652 (dut_ref_left_res, ref_left_dut_res) = await asyncio.gather( 653 self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref_left.address_asdict()), 654 anext(aiter(advertisement_left)), # pytype: disable=name-error 655 ) 656 assert_equal(dut_ref_left_res.result_variant(), 'connection') 657 dut_ref_left, ref_left_dut = dut_ref_left_res.connection, ref_left_dut_res.connection 658 assert_is_not_none(dut_ref_left) 659 assert dut_ref_left 660 advertisement_left.cancel() 661 662 advertisement_right = await self.ref_advertise_asha( 663 ref_device=self.ref_right, ref_address_type=ref_address_type, ear=Ear.RIGHT 664 ) 665 ref_right = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.RIGHT) 666 (dut_ref_right_res, ref_right_dut_res) = await asyncio.gather( 667 self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref_right.address_asdict()), 668 anext(aiter(advertisement_right)), # pytype: disable=name-error 669 ) 670 assert_equal(dut_ref_right_res.result_variant(), 'connection') 671 dut_ref_right, ref_right_dut = dut_ref_right_res.connection, ref_right_dut_res.connection 672 assert_is_not_none(dut_ref_right) 673 assert dut_ref_right 674 advertisement_right.cancel() 675 676 # Pairing 677 (secure_left, wait_security_left) = await asyncio.gather( 678 self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3), 679 self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3), 680 ) 681 assert_equal(secure_left.result_variant(), 'success') 682 assert_equal(wait_security_left.result_variant(), 'success') 683 684 (secure_right, wait_security_right) = await asyncio.gather( 685 self.dut.aio.security.Secure(connection=dut_ref_right, le=LE_LEVEL3), 686 self.ref_right.aio.security.WaitSecurity(connection=ref_right_dut, le=LE_LEVEL3), 687 ) 688 assert_equal(secure_right.result_variant(), 'success') 689 assert_equal(wait_security_right.result_variant(), 'success') 690 691 if tested_device == Ear.LEFT: 692 await asyncio.gather( 693 self.ref_left.aio.host.Disconnect(connection=ref_left_dut), 694 self.dut.aio.host.WaitDisconnection(connection=dut_ref_left), 695 ) 696 assert_false(await self.is_device_connected(self.ref_left, ref_left_dut, 5), "Should be disconnected") 697 698 advertisement_left = await self.ref_advertise_asha( 699 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 700 ) 701 ref_left_dut = (await anext(aiter(advertisement_left))).connection 702 advertisement_left.cancel() 703 else: 704 await asyncio.gather( 705 self.ref_right.aio.host.Disconnect(connection=ref_right_dut), 706 self.dut.aio.host.WaitDisconnection(connection=dut_ref_right), 707 ) 708 assert_false(await self.is_device_connected(self.ref_right, ref_right_dut, 5), "Should be disconnected") 709 710 advertisement_right = await self.ref_advertise_asha( 711 ref_device=self.ref_right, ref_address_type=ref_address_type, ear=Ear.RIGHT 712 ) 713 ref_right_dut = (await anext(aiter(advertisement_right))).connection 714 advertisement_right.cancel() 715 716 @asynchronous 717 async def test_music_start(self) -> None: 718 """ 719 DUT discovers Ref. 720 DUT initiates connection to Ref. 721 Verify that DUT and Ref are bonded and connected. 722 DUT starts media streaming. 723 Verify that DUT sends a correct AudioControlPoint `Start` command (codec=1, 724 audiotype=0, volume=<volume set on DUT>, otherstate=<state of Ref aux if dual devices>). 725 """ 726 727 async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]: 728 advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear) 729 ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear) 730 # DUT initiates connection to ref_device. 731 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM) 732 advertisement.cancel() 733 734 return dut_ref, ref_dut 735 736 dut_ref, ref_dut = await ref_device_connect(self.ref_left, Ear.LEFT) 737 le_psm_future = self.get_le_psm_future(self.ref_left) 738 read_only_properties_future = self.get_read_only_properties_future(self.ref_left) 739 740 # DUT starts pairing with the ref_left 741 (secure, wait_security) = await asyncio.gather( 742 self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3), 743 self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3), 744 ) 745 746 assert_equal(secure.result_variant(), 'success') 747 assert_equal(wait_security.result_variant(), 'success') 748 749 le_psm_out_result = await asyncio.wait_for(le_psm_future, timeout=3.0) 750 assert_is_not_none(le_psm_out_result) 751 752 read_only_properties_result = await asyncio.wait_for(read_only_properties_future, timeout=3.0) 753 assert_is_not_none(read_only_properties_result) 754 755 dut_asha = AioAsha(self.dut.aio.channel) 756 start_future = self.get_start_future(self.ref_left) 757 758 logging.info("send start") 759 await dut_asha.WaitPeripheral(connection=dut_ref) 760 _, start_result = await asyncio.gather( 761 dut_asha.Start(connection=dut_ref), asyncio.wait_for(start_future, timeout=3.0) 762 ) 763 764 logging.info(f"start_result:{start_result}") 765 assert_is_not_none(start_result) 766 assert_equal(start_result['codec'], 1) 767 assert_equal(start_result['audiotype'], 0) 768 assert_is_not_none(start_result['volume']) 769 assert_equal(start_result['otherstate'], 0) 770 771 @asynchronous 772 async def test_set_volume(self) -> None: 773 """ 774 DUT discovers Ref. 775 DUT initiates connection to Ref. 776 Verify that DUT and Ref are bonded and connected. 777 DUT is streaming media to Ref. 778 Change volume on DUT. 779 Verify DUT writes the correct value to ASHA `Volume` characteristic. 780 """ 781 raise signals.TestSkip("TODO: update bt test interface for SetVolume to retry") 782 783 advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=RANDOM, ear=Ear.LEFT) 784 785 ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=Ear.LEFT) 786 787 # DUT initiates connection to Ref. 788 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM) 789 790 # DUT starts pairing with the ref_left 791 (secure, wait_security) = await asyncio.gather( 792 self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3), 793 self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3), 794 ) 795 796 assert_equal(secure.result_variant(), 'success') 797 assert_equal(wait_security.result_variant(), 'success') 798 799 asha_service = next((x for x in self.ref_left.device.gatt_server.attributes if isinstance(x, AshaGattService))) 800 dut_asha = AioAsha(self.dut.aio.channel) 801 802 volume_future = asyncio.get_running_loop().create_future() 803 804 def volume_command_handler(connection: Connection, data: int): 805 volume_future.set_result(data) 806 807 asha_service.on('volume', volume_command_handler) 808 809 await dut_asha.WaitPeripheral(connection=dut_ref) 810 await dut_asha.Start(connection=dut_ref) 811 # set volume to max volume 812 _, volume_result = await asyncio.gather(dut_asha.SetVolume(1), asyncio.wait_for(volume_future, timeout=3.0)) 813 814 logging.info(f"start_result:{volume_result}") 815 assert_is_not_none(volume_result) 816 assert_equal(volume_result, 0) 817 818 @asynchronous 819 async def test_music_stop(self) -> None: 820 """ 821 DUT discovers Ref. 822 DUT initiates connection to Ref. 823 Verify that DUT and Ref are bonded and connected. 824 DUT is streaming media to Ref. 825 DUT stops media streaming on Ref. 826 Verify that DUT sends a correct AudioControlPoint `Stop` command. 827 """ 828 829 async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]: 830 advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear) 831 ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear) 832 # DUT initiates connection to ref_device. 833 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM) 834 advertisement.cancel() 835 836 return dut_ref, ref_dut 837 838 dut_ref, ref_dut = await ref_device_connect(self.ref_left, Ear.LEFT) 839 840 # DUT starts pairing with the ref_left 841 (secure, wait_security) = await asyncio.gather( 842 self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3), 843 self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3), 844 ) 845 846 assert_equal(secure.result_variant(), 'success') 847 assert_equal(wait_security.result_variant(), 'success') 848 849 dut_asha = AioAsha(self.dut.aio.channel) 850 851 stop_future = self.get_stop_future(self.ref_left) 852 853 await dut_asha.WaitPeripheral(connection=dut_ref) 854 await dut_asha.Start(connection=dut_ref) 855 logging.info("send stop") 856 _, stop_result = await asyncio.gather(dut_asha.Stop(), asyncio.wait_for(stop_future, timeout=10.0)) 857 858 logging.info(f"stop_result:{stop_result}") 859 assert_is_not_none(stop_result) 860 861 ref_asha = AioAsha(self.ref_left.aio.channel) 862 try: 863 ref_asha.CaptureAudio(connection=ref_dut, timeout=2) 864 except grpc.aio.AioRpcError as e: 865 if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: 866 logging.info("no audio data, work as expected") 867 else: 868 raise e 869 870 @asynchronous 871 async def test_music_restart(self) -> None: 872 """ 873 DUT discovers Ref. 874 DUT initiates connection to Ref. 875 Verify that DUT and Ref are bonded and connected. 876 DUT starts media streaming. 877 DUT stops media streaming. 878 Verify that DUT sends a correct AudioControlPoint `Stop` command. 879 DUT starts media streaming again. 880 Verify that DUT sends a correct AudioControlPoint `Start` command. 881 """ 882 883 async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]: 884 advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear) 885 ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear) 886 # DUT initiates connection to ref_device. 887 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM) 888 advertisement.cancel() 889 890 return dut_ref, ref_dut 891 892 dut_ref, ref_dut = await ref_device_connect(self.ref_left, Ear.LEFT) 893 894 # DUT starts pairing with the ref_left 895 (secure, wait_security) = await asyncio.gather( 896 self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3), 897 self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3), 898 ) 899 900 assert_equal(secure.result_variant(), 'success') 901 assert_equal(wait_security.result_variant(), 'success') 902 903 dut_asha = AioAsha(self.dut.aio.channel) 904 905 stop_future = self.get_stop_future(self.ref_left) 906 907 await dut_asha.WaitPeripheral(connection=dut_ref) 908 await dut_asha.Start(connection=dut_ref) 909 _, stop_result = await asyncio.gather(dut_asha.Stop(), asyncio.wait_for(stop_future, timeout=10.0)) 910 911 logging.info(f"stop_result:{stop_result}") 912 assert_is_not_none(stop_result) 913 914 # restart music streaming 915 logging.info("restart music streaming") 916 917 start_future = self.get_start_future(self.ref_left) 918 919 await dut_asha.WaitPeripheral(connection=dut_ref) 920 _, start_result = await asyncio.gather( 921 dut_asha.Start(connection=dut_ref), asyncio.wait_for(start_future, timeout=3.0) 922 ) 923 924 logging.info(f"start_result:{start_result}") 925 assert_is_not_none(start_result) 926 927 @asynchronous 928 async def test_music_start_dual_device(self) -> None: 929 """ 930 DUT discovers Ref. 931 DUT initiates connection to Ref. 932 Verify that DUT and Ref are bonded and connected. 933 DUT starts media streaming. 934 Verify that DUT sends a correct AudioControlPoint `Start` command (codec=1, 935 audiotype=0, volume=<volume set on DUT>, otherstate=<state of Ref aux if dual devices>). 936 """ 937 938 async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]: 939 advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear) 940 ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear) 941 # DUT initiates connection to ref_device. 942 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM) 943 advertisement.cancel() 944 945 return dut_ref, ref_dut 946 947 # connect ref_left 948 dut_ref_left, ref_left_dut = await ref_device_connect(self.ref_left, Ear.LEFT) 949 le_psm_future_left = self.get_le_psm_future(self.ref_left) 950 read_only_properties_future_left = self.get_read_only_properties_future(self.ref_left) 951 952 # DUT starts pairing with the ref_left 953 (secure_left, wait_security_left) = await asyncio.gather( 954 self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3), 955 self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3), 956 ) 957 958 assert_equal(secure_left.result_variant(), 'success') 959 assert_equal(wait_security_left.result_variant(), 'success') 960 961 le_psm_out_result_left = await asyncio.wait_for(le_psm_future_left, timeout=3.0) 962 assert_is_not_none(le_psm_out_result_left) 963 964 read_only_properties_result_left = await asyncio.wait_for(read_only_properties_future_left, timeout=3.0) 965 assert_is_not_none(read_only_properties_result_left) 966 967 dut_asha = AioAsha(self.dut.aio.channel) 968 start_future_left = self.get_start_future(self.ref_left) 969 970 logging.info("send start") 971 await dut_asha.WaitPeripheral(connection=dut_ref_left) 972 _, start_result_left = await asyncio.gather( 973 dut_asha.Start(connection=dut_ref_left), asyncio.wait_for(start_future_left, timeout=3.0) 974 ) 975 976 logging.info(f"start_result_left:{start_result_left}") 977 assert_is_not_none(start_result_left) 978 assert_equal(start_result_left['codec'], 1) 979 assert_equal(start_result_left['audiotype'], 0) 980 assert_is_not_none(start_result_left['volume']) 981 assert_equal(start_result_left['otherstate'], 0) 982 983 # connect ref_right 984 dut_ref_right, ref_right_dut = await ref_device_connect(self.ref_right, Ear.RIGHT) 985 le_psm_future_right = self.get_le_psm_future(self.ref_right) 986 read_only_properties_future_right = self.get_read_only_properties_future(self.ref_right) 987 988 # DUT starts pairing with the ref_right 989 (secure_right, wait_security_right) = await asyncio.gather( 990 self.dut.aio.security.Secure(connection=dut_ref_right, le=LE_LEVEL3), 991 self.ref_right.aio.security.WaitSecurity(connection=ref_right_dut, le=LE_LEVEL3), 992 ) 993 994 assert_equal(secure_right.result_variant(), 'success') 995 assert_equal(wait_security_right.result_variant(), 'success') 996 997 le_psm_out_result_right = await asyncio.wait_for(le_psm_future_right, timeout=3.0) 998 assert_is_not_none(le_psm_out_result_right) 999 1000 read_only_properties_result_right = await asyncio.wait_for(read_only_properties_future_right, timeout=3.0) 1001 assert_is_not_none(read_only_properties_result_right) 1002 1003 start_future_right = self.get_start_future(self.ref_right) 1004 1005 logging.info("send start_right") 1006 await dut_asha.WaitPeripheral(connection=dut_ref_right) 1007 start_result_right = await asyncio.wait_for(start_future_right, timeout=10.0) 1008 1009 logging.info(f"start_result_right:{start_result_right}") 1010 assert_is_not_none(start_result_right) 1011 assert_equal(start_result_right['codec'], 1) 1012 assert_equal(start_result_right['audiotype'], 0) 1013 assert_is_not_none(start_result_right['volume']) 1014 # ref_left already connected, otherstate = 1 1015 assert_equal(start_result_right['otherstate'], 1) 1016 1017 1018if __name__ == "__main__": 1019 logging.basicConfig(level=logging.DEBUG) 1020 test_runner.main() # type: ignore 1021