• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import avatar
17import enum
18import grpc
19import logging
20
21from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, asynchronous, bumble_server
22from bumble.gatt import GATT_ASHA_SERVICE
23from bumble.pairing import PairingDelegate
24from bumble_experimental.asha import AshaGattService, AshaService
25from mobly import base_test, signals, test_runner
26from mobly.asserts import assert_equal  # type: ignore
27from mobly.asserts import assert_false  # type: ignore
28from mobly.asserts import assert_in  # type: ignore
29from mobly.asserts import assert_is_not_none  # type: ignore
30from mobly.asserts import assert_true  # type: ignore
31from pandora._utils import AioStream
32from pandora.host_pb2 import PUBLIC, RANDOM, AdvertiseResponse, Connection, DataTypes, OwnAddressType, ScanningResponse
33from pandora.security_pb2 import LE_LEVEL3
34from pandora_experimental.asha_grpc_aio import Asha as AioAsha, add_AshaServicer_to_server
35from typing import List, Optional, Tuple
36
37ASHA_UUID = GATT_ASHA_SERVICE.to_hex_str('-')
38HISYCNID: List[int] = [0x01, 0x02, 0x03, 0x04, 0x5, 0x6, 0x7, 0x8]
39COMPLETE_LOCAL_NAME: str = "Bumble"
40AUDIO_SIGNAL_AMPLITUDE = 0.8
41AUDIO_SIGNAL_SAMPLING_RATE = 44100
42
43
44class Ear(enum.IntEnum):
45    """Reference devices type"""
46
47    LEFT = 0
48    RIGHT = 1
49
50
51class ASHATest(base_test.BaseTestClass):  # type: ignore[misc]
52    devices: Optional[PandoraDevices] = None
53
54    # pandora devices.
55    dut: PandoraDevice
56    ref_left: BumblePandoraDevice
57    ref_right: BumblePandoraDevice
58
59    def setup_class(self) -> None:
60        # Register experimental bumble servicers hook.
61        bumble_server.register_servicer_hook(
62            lambda bumble, _, server: add_AshaServicer_to_server(AshaService(bumble.device), server)
63        )
64
65        self.devices = PandoraDevices(self)
66        self.dut, ref_left, ref_right, *_ = self.devices
67
68        if isinstance(self.dut, BumblePandoraDevice):
69            raise signals.TestAbortClass('DUT Bumble does not support Asha source')
70        if not isinstance(ref_left, BumblePandoraDevice):
71            raise signals.TestAbortClass('Test require Bumble as reference device(s)')
72        if not isinstance(ref_right, BumblePandoraDevice):
73            raise signals.TestAbortClass('Test require Bumble as reference device(s)')
74
75        self.ref_left, self.ref_right = ref_left, ref_right
76
77    def teardown_class(self) -> None:
78        if self.devices:
79            self.devices.stop_all()
80
81    @avatar.asynchronous
82    async def setup_test(self) -> None:
83        await asyncio.gather(self.dut.reset(), self.ref_left.reset(), self.ref_right.reset())
84
85        # ASHA hearing aid's IO capability is NO_OUTPUT_NO_INPUT
86        self.ref_left.server_config.io_capability = PairingDelegate.NO_OUTPUT_NO_INPUT
87        self.ref_right.server_config.io_capability = PairingDelegate.NO_OUTPUT_NO_INPUT
88
89    async def ref_advertise_asha(
90        self, ref_device: PandoraDevice, ref_address_type: OwnAddressType, ear: Ear
91    ) -> AioStream[AdvertiseResponse]:
92        """
93        Ref device starts to advertise with service data in advertisement data.
94        :return: Ref device's advertise stream
95        """
96        # Ref starts advertising with ASHA service data
97        asha = AioAsha(ref_device.aio.channel)
98        await asha.Register(capability=ear, hisyncid=HISYCNID)
99        return ref_device.aio.host.Advertise(
100            legacy=True,
101            connectable=True,
102            own_address_type=ref_address_type,
103            data=DataTypes(
104                complete_local_name=COMPLETE_LOCAL_NAME,
105                incomplete_service_class_uuids16=[ASHA_UUID],
106            ),
107        )
108
109    async def dut_scan_for_asha(self, dut_address_type: OwnAddressType, ear: Ear) -> ScanningResponse:
110        """
111        DUT starts to scan for the Ref device.
112        :return: ScanningResponse for ASHA
113        """
114        dut_scan = self.dut.aio.host.Scan(own_address_type=dut_address_type)
115        expected_advertisement_data = self.get_expected_advertisement_data(ear)
116        ref = await anext(
117            (
118                x
119                async for x in dut_scan
120                if (
121                    ASHA_UUID in x.data.incomplete_service_class_uuids16
122                    and expected_advertisement_data == (x.data.service_data_uuid16[ASHA_UUID]).hex()
123                )
124            )
125        )
126        dut_scan.cancel()
127        return ref
128
129    async def dut_connect_to_ref(
130        self, advertisement: AioStream[AdvertiseResponse], ref: ScanningResponse, dut_address_type: OwnAddressType
131    ) -> Tuple[Connection, Connection]:
132        """
133        Helper method for Dut connects to Ref
134        :return: a Tuple (DUT to REF connection, REF to DUT connection)
135        """
136        (dut_ref_res, ref_dut_res) = await asyncio.gather(
137            self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()),
138            anext(aiter(advertisement)),  # pytype: disable=name-error
139        )
140        assert_equal(dut_ref_res.result_variant(), 'connection')
141        dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection
142        assert_is_not_none(dut_ref)
143        assert dut_ref
144        advertisement.cancel()
145        return dut_ref, ref_dut
146
147    async def is_device_connected(self, device: PandoraDevice, connection: Connection, timeout: float) -> bool:
148        try:
149            await device.aio.host.WaitDisconnection(connection=connection, timeout=timeout)
150            return False
151        except grpc.RpcError as e:
152            assert_equal(e.code(), grpc.StatusCode.DEADLINE_EXCEEDED)  # type: ignore
153            return True
154
155    def get_expected_advertisement_data(self, ear: Ear) -> str:
156        protocol_version = 0x01
157        truncated_hisyncid = HISYCNID[:4]
158        return (
159            "{:02x}".format(protocol_version)
160            + "{:02x}".format(ear)
161            + "".join([("{:02x}".format(x)) for x in truncated_hisyncid])
162        )
163
164    def get_le_psm_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[int]:
165        asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService)))
166        le_psm_future = asyncio.get_running_loop().create_future()
167
168        def le_psm_handler(connection: Connection, data: int) -> None:
169            le_psm_future.set_result(data)
170
171        asha_service.on('le_psm_out', le_psm_handler)
172        return le_psm_future
173
174    def get_read_only_properties_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[bytes]:
175        asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService)))
176        read_only_properties_future = asyncio.get_running_loop().create_future()
177
178        def read_only_properties_handler(connection: Connection, data: bytes) -> None:
179            read_only_properties_future.set_result(data)
180
181        asha_service.on('read_only_properties', read_only_properties_handler)
182        return read_only_properties_future
183
184    def get_start_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[dict[str, int]]:
185        asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService)))
186        start_future = asyncio.get_running_loop().create_future()
187
188        def start_command_handler(connection: Connection, data: dict[str, int]) -> None:
189            start_future.set_result(data)
190
191        asha_service.on('start', start_command_handler)
192        return start_future
193
194    def get_stop_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[Connection]:
195        asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService)))
196        stop_future = asyncio.get_running_loop().create_future()
197
198        def stop_command_handler(connection: Connection) -> None:
199            stop_future.set_result(connection)
200
201        asha_service.on('stop', stop_command_handler)
202        return stop_future
203
204    @avatar.parameterized(
205        (RANDOM, Ear.LEFT),
206        (RANDOM, Ear.RIGHT),
207    )  # type: ignore[misc]
208    @asynchronous
209    async def test_advertising_advertisement_data(
210        self,
211        ref_address_type: OwnAddressType,
212        ear: Ear,
213    ) -> None:
214        """
215        Ref starts ASHA advertisements with service data in advertisement data.
216        DUT starts a service discovery.
217        Verify Ref is correctly discovered by DUT as a hearing aid device.
218        """
219        advertisement = await self.ref_advertise_asha(self.ref_left, ref_address_type, ear)
220
221        # DUT starts a service discovery
222        scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
223        advertisement.cancel()
224
225        # Verify Ref is correctly discovered by DUT as a hearing aid device
226        assert_in(ASHA_UUID, scan_result.data.service_data_uuid16)
227        assert_equal(type(scan_result.data.complete_local_name), str)
228        expected_advertisement_data = self.get_expected_advertisement_data(ear)
229        assert_equal(
230            expected_advertisement_data,
231            (scan_result.data.service_data_uuid16[ASHA_UUID]).hex(),
232        )
233
234    @asynchronous
235    async def test_advertising_scan_response(self) -> None:
236        """
237        Ref starts ASHA advertisements with service data in scan response data.
238        DUT starts a service discovery.
239        Verify Ref is correctly discovered by DUT as a hearing aid device.
240        """
241        asha = AioAsha(self.ref_left.aio.channel)
242        await asha.Register(capability=Ear.LEFT, hisyncid=HISYCNID)
243
244        # advertise with ASHA service data in scan response
245        advertisement = self.ref_left.aio.host.Advertise(
246            legacy=True,
247            scan_response_data=DataTypes(
248                complete_local_name=COMPLETE_LOCAL_NAME,
249                complete_service_class_uuids16=[ASHA_UUID],
250            ),
251        )
252
253        scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=Ear.LEFT)
254        advertisement.cancel()
255
256        # Verify Ref is correctly discovered by DUT as a hearing aid device.
257        assert_in(ASHA_UUID, scan_result.data.service_data_uuid16)
258        expected_advertisement_data = self.get_expected_advertisement_data(Ear.LEFT)
259        assert_equal(
260            expected_advertisement_data,
261            (scan_result.data.service_data_uuid16[ASHA_UUID]).hex(),
262        )
263
264    @avatar.parameterized(
265        (RANDOM, PUBLIC),
266        (RANDOM, RANDOM),
267    )  # type: ignore[misc]
268    @asynchronous
269    async def test_pairing(
270        self,
271        dut_address_type: OwnAddressType,
272        ref_address_type: OwnAddressType,
273    ) -> None:
274        """
275        DUT discovers Ref.
276        DUT initiates connection to Ref.
277        Verify that DUT and Ref are bonded and connected.
278        """
279        advertisement = await self.ref_advertise_asha(
280            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
281        )
282
283        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
284
285        # DUT initiates connection to Ref.
286        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
287
288        # DUT starts pairing with the Ref.
289        (secure, wait_security) = await asyncio.gather(
290            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
291            self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
292        )
293
294        assert_equal(secure.result_variant(), 'success')
295        assert_equal(wait_security.result_variant(), 'success')
296
297    @avatar.parameterized(
298        (RANDOM, PUBLIC),
299        (RANDOM, RANDOM),
300    )  # type: ignore[misc]
301    @asynchronous
302    async def test_pairing_dual_device(
303        self,
304        dut_address_type: OwnAddressType,
305        ref_address_type: OwnAddressType,
306    ) -> None:
307        """
308        DUT discovers Ref.
309        DUT initiates connection to Ref.
310        Verify that DUT and Ref are bonded and connected.
311        """
312
313        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
314            advertisement = await self.ref_advertise_asha(
315                ref_device=ref_device, ref_address_type=ref_address_type, ear=ear
316            )
317            ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=ear)
318            # DUT initiates connection to ref_device.
319            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
320            advertisement.cancel()
321
322            return dut_ref, ref_dut
323
324        ((dut_ref_left, ref_left_dut), (dut_ref_right, ref_right_dut)) = await asyncio.gather(
325            ref_device_connect(self.ref_left, Ear.LEFT), ref_device_connect(self.ref_right, Ear.RIGHT)
326        )
327
328        # DUT starts pairing with the ref_left
329        (secure_left, wait_security_left) = await asyncio.gather(
330            self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3),
331            self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3),
332        )
333
334        assert_equal(secure_left.result_variant(), 'success')
335        assert_equal(wait_security_left.result_variant(), 'success')
336
337        # DUT starts pairing with the ref_right
338        (secure_right, wait_security_right) = await asyncio.gather(
339            self.dut.aio.security.Secure(connection=dut_ref_right, le=LE_LEVEL3),
340            self.ref_right.aio.security.WaitSecurity(connection=ref_right_dut, le=LE_LEVEL3),
341        )
342
343        assert_equal(secure_right.result_variant(), 'success')
344        assert_equal(wait_security_right.result_variant(), 'success')
345
346        await asyncio.gather(
347            self.ref_left.aio.host.Disconnect(connection=ref_left_dut),
348            self.dut.aio.host.WaitDisconnection(connection=dut_ref_left),
349        )
350        await asyncio.gather(
351            self.ref_right.aio.host.Disconnect(connection=ref_right_dut),
352            self.dut.aio.host.WaitDisconnection(connection=dut_ref_right),
353        )
354
355    @avatar.parameterized(
356        (RANDOM, PUBLIC),
357        (RANDOM, RANDOM),
358    )  # type: ignore[misc]
359    @asynchronous
360    async def test_unbonding(
361        self,
362        dut_address_type: OwnAddressType,
363        ref_address_type: OwnAddressType,
364    ) -> None:
365        """
366        DUT removes bond with Ref.
367        Verify that DUT and Ref are disconnected and unbonded.
368        """
369        raise signals.TestSkip("TODO: update rootcanal to retry")
370
371        advertisement = await self.ref_advertise_asha(
372            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
373        )
374        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
375
376        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
377
378        secure = self.dut.security.Secure(connection=dut_ref, le=LE_LEVEL3)
379
380        assert_equal(secure.WhichOneof("result"), "success")
381        await self.dut.aio.host.Disconnect(dut_ref)
382        await self.ref_left.aio.host.WaitDisconnection(ref_dut)
383
384        # delete the bond
385        await self.dut.aio.security_storage.DeleteBond(random=self.ref_left.random_address)
386
387        # DUT connect to REF again
388        dut_ref = (
389            await self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict())
390        ).connection
391        # TODO very likely there is a bug in android here
392        logging.debug("result should come out")
393
394        advertisement.cancel()
395        assert_is_not_none(dut_ref)
396
397        secure = await self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3)
398
399        assert_equal(secure.WhichOneof("result"), "success")
400
401    @avatar.parameterized(
402        (RANDOM, RANDOM),
403        (RANDOM, PUBLIC),
404    )  # type: ignore[misc]
405    @asynchronous
406    async def test_connection(self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType) -> None:
407        """
408        DUT discovers Ref.
409        DUT initiates connection to Ref.
410        Verify that DUT and Ref are connected.
411        """
412        advertisement = await self.ref_advertise_asha(
413            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
414        )
415        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
416
417        _, _ = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
418
419    @avatar.parameterized(
420        (RANDOM, RANDOM),
421        (RANDOM, PUBLIC),
422    )  # type: ignore[misc]
423    @asynchronous
424    async def test_disconnect_initiator(
425        self,
426        dut_address_type: OwnAddressType,
427        ref_address_type: OwnAddressType,
428    ) -> None:
429        """
430        DUT initiates disconnection to Ref.
431        Verify that DUT and Ref are disconnected.
432        """
433        advertisement = await self.ref_advertise_asha(
434            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
435        )
436        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
437
438        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
439        advertisement.cancel()
440
441        await self.dut.aio.host.Disconnect(connection=dut_ref)
442        assert_false(await self.is_device_connected(self.ref_left, ref_dut, 5), "Should be disconnected")
443
444    @avatar.parameterized(
445        (RANDOM, RANDOM),
446        (RANDOM, PUBLIC),
447    )  # type: ignore[misc]
448    @asynchronous
449    async def test_disconnect_initiator_dual_device(
450        self,
451        dut_address_type: OwnAddressType,
452        ref_address_type: OwnAddressType,
453    ) -> None:
454        """
455        DUT initiates disconnection to Ref.
456        Verify that DUT and Ref are disconnected.
457        """
458
459        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
460            advertisement = await self.ref_advertise_asha(
461                ref_device=ref_device, ref_address_type=ref_address_type, ear=ear
462            )
463            ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=ear)
464            # DUT initiates connection to ref_device.
465            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
466            advertisement.cancel()
467
468            return dut_ref, ref_dut
469
470        ((dut_ref_left, ref_left_dut), (dut_ref_right, ref_right_dut)) = await asyncio.gather(
471            ref_device_connect(self.ref_left, Ear.LEFT), ref_device_connect(self.ref_right, Ear.RIGHT)
472        )
473
474        # Disconnect from DUT
475        await asyncio.gather(
476            self.dut.aio.host.Disconnect(connection=dut_ref_left),
477            self.dut.aio.host.Disconnect(connection=dut_ref_right),
478        )
479
480        # Verify the Refs are disconnected
481        assert_false(await self.is_device_connected(self.ref_left, ref_left_dut, 5), "Should be disconnected")
482        assert_false(await self.is_device_connected(self.ref_right, ref_right_dut, 5), "Should be disconnected")
483
484    @avatar.parameterized(
485        (RANDOM, RANDOM),
486        (RANDOM, PUBLIC),
487    )  # type: ignore[misc]
488    @asynchronous
489    async def test_disconnect_acceptor(
490        self,
491        dut_address_type: OwnAddressType,
492        ref_address_type: OwnAddressType,
493    ) -> None:
494        """
495        Ref initiates disconnection to DUT (typically when put back in its box).
496        Verify that Ref is disconnected.
497        """
498        advertisement = await self.ref_advertise_asha(
499            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
500        )
501        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
502
503        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
504        advertisement.cancel()
505
506        await self.ref_left.aio.host.Disconnect(connection=ref_dut)
507        assert_false(await self.is_device_connected(self.dut, dut_ref, 5), "Should be disconnected")
508
509    @avatar.parameterized(
510        (RANDOM, RANDOM, 0),
511        (RANDOM, RANDOM, 0.5),
512        (RANDOM, RANDOM, 1),
513        (RANDOM, RANDOM, 5),
514    )  # type: ignore[misc]
515    @asynchronous
516    async def test_reconnection(
517        self,
518        dut_address_type: OwnAddressType,
519        ref_address_type: OwnAddressType,
520        reconnection_gap: float,
521    ) -> None:
522        """
523        DUT initiates disconnection to the Ref.
524        Verify that DUT and Ref are disconnected.
525        DUT reconnects to Ref after various certain time.
526        Verify that DUT and Ref are connected.
527        """
528
529        async def connect_and_disconnect() -> None:
530            advertisement = await self.ref_advertise_asha(
531                ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
532            )
533            ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
534            dut_ref, _ = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
535            await self.dut.aio.host.Disconnect(connection=dut_ref)
536
537        await connect_and_disconnect()
538        # simulating reconnect interval
539        await asyncio.sleep(reconnection_gap)
540        await connect_and_disconnect()
541
542    @avatar.parameterized(
543        (RANDOM, RANDOM),
544        (RANDOM, PUBLIC),
545    )  # type: ignore[misc]
546    @asynchronous
547    async def test_auto_connection(
548        self,
549        dut_address_type: OwnAddressType,
550        ref_address_type: OwnAddressType,
551    ) -> None:
552        """
553        Ref initiates disconnection to DUT.
554        Ref starts sending ASHA advertisements.
555        Verify that DUT auto-connects to Ref.
556        """
557        advertisement = await self.ref_advertise_asha(
558            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
559        )
560        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
561
562        # manually connect and not cancel advertisement
563        dut_ref_res, ref_dut_res = await asyncio.gather(
564            self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()),
565            anext(aiter(advertisement)),  # pytype: disable=name-error
566        )
567        assert_equal(dut_ref_res.result_variant(), 'connection')
568        dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection
569        assert_is_not_none(dut_ref)
570        assert dut_ref
571
572        # Pairing
573        (secure, wait_security) = await asyncio.gather(
574            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
575            self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
576        )
577        assert_equal(secure.result_variant(), 'success')
578        assert_equal(wait_security.result_variant(), 'success')
579
580        await self.ref_left.aio.host.Disconnect(connection=ref_dut)
581
582        ref_dut = (await anext(aiter(advertisement))).connection
583        advertisement.cancel()
584
585    @avatar.parameterized(
586        (RANDOM, RANDOM, Ear.LEFT),
587        (RANDOM, PUBLIC, Ear.RIGHT),
588    )  # type: ignore[misc]
589    @asynchronous
590    async def test_disconnect_acceptor_dual_device(
591        self,
592        dut_address_type: OwnAddressType,
593        ref_address_type: OwnAddressType,
594        disconnect_device: Ear,
595    ) -> None:
596        """
597        Prerequisites: DUT and Ref are connected and bonded.
598        Description:
599           1. One peripheral of Ref initiates disconnection to DUT.
600           2. Verify that it is disconnected and that the other peripheral is still connected.
601        """
602
603        advertisement_left = await self.ref_advertise_asha(
604            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
605        )
606        ref_left = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
607        _, ref_left_dut = await self.dut_connect_to_ref(
608            advertisement=advertisement_left, ref=ref_left, dut_address_type=dut_address_type
609        )
610        advertisement_left.cancel()
611
612        advertisement_right = await self.ref_advertise_asha(
613            ref_device=self.ref_right, ref_address_type=ref_address_type, ear=Ear.RIGHT
614        )
615        ref_right = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.RIGHT)
616        _, ref_right_dut = await self.dut_connect_to_ref(
617            advertisement=advertisement_right, ref=ref_right, dut_address_type=dut_address_type
618        )
619        advertisement_right.cancel()
620
621        if disconnect_device == Ear.LEFT:
622            await self.ref_left.aio.host.Disconnect(connection=ref_left_dut)
623            assert_true(await self.is_device_connected(self.ref_right, ref_right_dut, 5), "Should be disconnected")
624            assert_false(await self.is_device_connected(self.ref_left, ref_left_dut, 5), "Should be disconnected")
625        else:
626            await self.ref_right.aio.host.Disconnect(connection=ref_right_dut)
627            assert_false(await self.is_device_connected(self.ref_right, ref_right_dut, 5), "Should be disconnected")
628            assert_true(await self.is_device_connected(self.ref_left, ref_left_dut, 5), "Should be disconnected")
629
630    @avatar.parameterized(
631        (RANDOM, RANDOM, Ear.LEFT),
632        (RANDOM, RANDOM, Ear.RIGHT),
633        (RANDOM, PUBLIC, Ear.LEFT),
634        (RANDOM, PUBLIC, Ear.RIGHT),
635    )  # type: ignore[misc]
636    @asynchronous
637    async def test_auto_connection_dual_device(
638        self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, tested_device: Ear
639    ) -> None:
640        """
641        Prerequisites: DUT and Ref are connected and bonded. Ref is a dual device.
642        Description:
643           1. One peripheral of Ref initiates disconnection to DUT.
644           2. The disconnected peripheral starts sending ASHA advertisements.
645           3. Verify that DUT auto-connects to the peripheral.
646        """
647
648        advertisement_left = await self.ref_advertise_asha(
649            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
650        )
651        ref_left = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
652        (dut_ref_left_res, ref_left_dut_res) = await asyncio.gather(
653            self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref_left.address_asdict()),
654            anext(aiter(advertisement_left)),  # pytype: disable=name-error
655        )
656        assert_equal(dut_ref_left_res.result_variant(), 'connection')
657        dut_ref_left, ref_left_dut = dut_ref_left_res.connection, ref_left_dut_res.connection
658        assert_is_not_none(dut_ref_left)
659        assert dut_ref_left
660        advertisement_left.cancel()
661
662        advertisement_right = await self.ref_advertise_asha(
663            ref_device=self.ref_right, ref_address_type=ref_address_type, ear=Ear.RIGHT
664        )
665        ref_right = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.RIGHT)
666        (dut_ref_right_res, ref_right_dut_res) = await asyncio.gather(
667            self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref_right.address_asdict()),
668            anext(aiter(advertisement_right)),  # pytype: disable=name-error
669        )
670        assert_equal(dut_ref_right_res.result_variant(), 'connection')
671        dut_ref_right, ref_right_dut = dut_ref_right_res.connection, ref_right_dut_res.connection
672        assert_is_not_none(dut_ref_right)
673        assert dut_ref_right
674        advertisement_right.cancel()
675
676        # Pairing
677        (secure_left, wait_security_left) = await asyncio.gather(
678            self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3),
679            self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3),
680        )
681        assert_equal(secure_left.result_variant(), 'success')
682        assert_equal(wait_security_left.result_variant(), 'success')
683
684        (secure_right, wait_security_right) = await asyncio.gather(
685            self.dut.aio.security.Secure(connection=dut_ref_right, le=LE_LEVEL3),
686            self.ref_right.aio.security.WaitSecurity(connection=ref_right_dut, le=LE_LEVEL3),
687        )
688        assert_equal(secure_right.result_variant(), 'success')
689        assert_equal(wait_security_right.result_variant(), 'success')
690
691        if tested_device == Ear.LEFT:
692            await asyncio.gather(
693                self.ref_left.aio.host.Disconnect(connection=ref_left_dut),
694                self.dut.aio.host.WaitDisconnection(connection=dut_ref_left),
695            )
696            assert_false(await self.is_device_connected(self.ref_left, ref_left_dut, 5), "Should be disconnected")
697
698            advertisement_left = await self.ref_advertise_asha(
699                ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
700            )
701            ref_left_dut = (await anext(aiter(advertisement_left))).connection
702            advertisement_left.cancel()
703        else:
704            await asyncio.gather(
705                self.ref_right.aio.host.Disconnect(connection=ref_right_dut),
706                self.dut.aio.host.WaitDisconnection(connection=dut_ref_right),
707            )
708            assert_false(await self.is_device_connected(self.ref_right, ref_right_dut, 5), "Should be disconnected")
709
710            advertisement_right = await self.ref_advertise_asha(
711                ref_device=self.ref_right, ref_address_type=ref_address_type, ear=Ear.RIGHT
712            )
713            ref_right_dut = (await anext(aiter(advertisement_right))).connection
714            advertisement_right.cancel()
715
716    @asynchronous
717    async def test_music_start(self) -> None:
718        """
719        DUT discovers Ref.
720        DUT initiates connection to Ref.
721        Verify that DUT and Ref are bonded and connected.
722        DUT starts media streaming.
723        Verify that DUT sends a correct AudioControlPoint `Start` command (codec=1,
724        audiotype=0, volume=<volume set on DUT>, otherstate=<state of Ref aux if dual devices>).
725        """
726
727        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
728            advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear)
729            ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
730            # DUT initiates connection to ref_device.
731            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
732            advertisement.cancel()
733
734            return dut_ref, ref_dut
735
736        dut_ref, ref_dut = await ref_device_connect(self.ref_left, Ear.LEFT)
737        le_psm_future = self.get_le_psm_future(self.ref_left)
738        read_only_properties_future = self.get_read_only_properties_future(self.ref_left)
739
740        # DUT starts pairing with the ref_left
741        (secure, wait_security) = await asyncio.gather(
742            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
743            self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
744        )
745
746        assert_equal(secure.result_variant(), 'success')
747        assert_equal(wait_security.result_variant(), 'success')
748
749        le_psm_out_result = await asyncio.wait_for(le_psm_future, timeout=3.0)
750        assert_is_not_none(le_psm_out_result)
751
752        read_only_properties_result = await asyncio.wait_for(read_only_properties_future, timeout=3.0)
753        assert_is_not_none(read_only_properties_result)
754
755        dut_asha = AioAsha(self.dut.aio.channel)
756        start_future = self.get_start_future(self.ref_left)
757
758        logging.info("send start")
759        await dut_asha.WaitPeripheral(connection=dut_ref)
760        _, start_result = await asyncio.gather(
761            dut_asha.Start(connection=dut_ref), asyncio.wait_for(start_future, timeout=3.0)
762        )
763
764        logging.info(f"start_result:{start_result}")
765        assert_is_not_none(start_result)
766        assert_equal(start_result['codec'], 1)
767        assert_equal(start_result['audiotype'], 0)
768        assert_is_not_none(start_result['volume'])
769        assert_equal(start_result['otherstate'], 0)
770
771    @asynchronous
772    async def test_set_volume(self) -> None:
773        """
774        DUT discovers Ref.
775        DUT initiates connection to Ref.
776        Verify that DUT and Ref are bonded and connected.
777        DUT is streaming media to Ref.
778        Change volume on DUT.
779        Verify DUT writes the correct value to ASHA `Volume` characteristic.
780        """
781        raise signals.TestSkip("TODO: update bt test interface for SetVolume to retry")
782
783        advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=RANDOM, ear=Ear.LEFT)
784
785        ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=Ear.LEFT)
786
787        # DUT initiates connection to Ref.
788        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
789
790        # DUT starts pairing with the ref_left
791        (secure, wait_security) = await asyncio.gather(
792            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
793            self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
794        )
795
796        assert_equal(secure.result_variant(), 'success')
797        assert_equal(wait_security.result_variant(), 'success')
798
799        asha_service = next((x for x in self.ref_left.device.gatt_server.attributes if isinstance(x, AshaGattService)))
800        dut_asha = AioAsha(self.dut.aio.channel)
801
802        volume_future = asyncio.get_running_loop().create_future()
803
804        def volume_command_handler(connection: Connection, data: int):
805            volume_future.set_result(data)
806
807        asha_service.on('volume', volume_command_handler)
808
809        await dut_asha.WaitPeripheral(connection=dut_ref)
810        await dut_asha.Start(connection=dut_ref)
811        # set volume to max volume
812        _, volume_result = await asyncio.gather(dut_asha.SetVolume(1), asyncio.wait_for(volume_future, timeout=3.0))
813
814        logging.info(f"start_result:{volume_result}")
815        assert_is_not_none(volume_result)
816        assert_equal(volume_result, 0)
817
818    @asynchronous
819    async def test_music_stop(self) -> None:
820        """
821        DUT discovers Ref.
822        DUT initiates connection to Ref.
823        Verify that DUT and Ref are bonded and connected.
824        DUT is streaming media to Ref.
825        DUT stops media streaming on Ref.
826        Verify that DUT sends a correct AudioControlPoint `Stop` command.
827        """
828
829        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
830            advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear)
831            ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
832            # DUT initiates connection to ref_device.
833            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
834            advertisement.cancel()
835
836            return dut_ref, ref_dut
837
838        dut_ref, ref_dut = await ref_device_connect(self.ref_left, Ear.LEFT)
839
840        # DUT starts pairing with the ref_left
841        (secure, wait_security) = await asyncio.gather(
842            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
843            self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
844        )
845
846        assert_equal(secure.result_variant(), 'success')
847        assert_equal(wait_security.result_variant(), 'success')
848
849        dut_asha = AioAsha(self.dut.aio.channel)
850
851        stop_future = self.get_stop_future(self.ref_left)
852
853        await dut_asha.WaitPeripheral(connection=dut_ref)
854        await dut_asha.Start(connection=dut_ref)
855        logging.info("send stop")
856        _, stop_result = await asyncio.gather(dut_asha.Stop(), asyncio.wait_for(stop_future, timeout=10.0))
857
858        logging.info(f"stop_result:{stop_result}")
859        assert_is_not_none(stop_result)
860
861        ref_asha = AioAsha(self.ref_left.aio.channel)
862        try:
863            ref_asha.CaptureAudio(connection=ref_dut, timeout=2)
864        except grpc.aio.AioRpcError as e:
865            if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
866                logging.info("no audio data, work as expected")
867            else:
868                raise e
869
870    @asynchronous
871    async def test_music_restart(self) -> None:
872        """
873        DUT discovers Ref.
874        DUT initiates connection to Ref.
875        Verify that DUT and Ref are bonded and connected.
876        DUT starts media streaming.
877        DUT stops media streaming.
878        Verify that DUT sends a correct AudioControlPoint `Stop` command.
879        DUT starts media streaming again.
880        Verify that DUT sends a correct AudioControlPoint `Start` command.
881        """
882
883        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
884            advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear)
885            ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
886            # DUT initiates connection to ref_device.
887            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
888            advertisement.cancel()
889
890            return dut_ref, ref_dut
891
892        dut_ref, ref_dut = await ref_device_connect(self.ref_left, Ear.LEFT)
893
894        # DUT starts pairing with the ref_left
895        (secure, wait_security) = await asyncio.gather(
896            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
897            self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
898        )
899
900        assert_equal(secure.result_variant(), 'success')
901        assert_equal(wait_security.result_variant(), 'success')
902
903        dut_asha = AioAsha(self.dut.aio.channel)
904
905        stop_future = self.get_stop_future(self.ref_left)
906
907        await dut_asha.WaitPeripheral(connection=dut_ref)
908        await dut_asha.Start(connection=dut_ref)
909        _, stop_result = await asyncio.gather(dut_asha.Stop(), asyncio.wait_for(stop_future, timeout=10.0))
910
911        logging.info(f"stop_result:{stop_result}")
912        assert_is_not_none(stop_result)
913
914        # restart music streaming
915        logging.info("restart music streaming")
916
917        start_future = self.get_start_future(self.ref_left)
918
919        await dut_asha.WaitPeripheral(connection=dut_ref)
920        _, start_result = await asyncio.gather(
921            dut_asha.Start(connection=dut_ref), asyncio.wait_for(start_future, timeout=3.0)
922        )
923
924        logging.info(f"start_result:{start_result}")
925        assert_is_not_none(start_result)
926
927    @asynchronous
928    async def test_music_start_dual_device(self) -> None:
929        """
930        DUT discovers Ref.
931        DUT initiates connection to Ref.
932        Verify that DUT and Ref are bonded and connected.
933        DUT starts media streaming.
934        Verify that DUT sends a correct AudioControlPoint `Start` command (codec=1,
935        audiotype=0, volume=<volume set on DUT>, otherstate=<state of Ref aux if dual devices>).
936        """
937
938        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
939            advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear)
940            ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
941            # DUT initiates connection to ref_device.
942            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
943            advertisement.cancel()
944
945            return dut_ref, ref_dut
946
947        # connect ref_left
948        dut_ref_left, ref_left_dut = await ref_device_connect(self.ref_left, Ear.LEFT)
949        le_psm_future_left = self.get_le_psm_future(self.ref_left)
950        read_only_properties_future_left = self.get_read_only_properties_future(self.ref_left)
951
952        # DUT starts pairing with the ref_left
953        (secure_left, wait_security_left) = await asyncio.gather(
954            self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3),
955            self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3),
956        )
957
958        assert_equal(secure_left.result_variant(), 'success')
959        assert_equal(wait_security_left.result_variant(), 'success')
960
961        le_psm_out_result_left = await asyncio.wait_for(le_psm_future_left, timeout=3.0)
962        assert_is_not_none(le_psm_out_result_left)
963
964        read_only_properties_result_left = await asyncio.wait_for(read_only_properties_future_left, timeout=3.0)
965        assert_is_not_none(read_only_properties_result_left)
966
967        dut_asha = AioAsha(self.dut.aio.channel)
968        start_future_left = self.get_start_future(self.ref_left)
969
970        logging.info("send start")
971        await dut_asha.WaitPeripheral(connection=dut_ref_left)
972        _, start_result_left = await asyncio.gather(
973            dut_asha.Start(connection=dut_ref_left), asyncio.wait_for(start_future_left, timeout=3.0)
974        )
975
976        logging.info(f"start_result_left:{start_result_left}")
977        assert_is_not_none(start_result_left)
978        assert_equal(start_result_left['codec'], 1)
979        assert_equal(start_result_left['audiotype'], 0)
980        assert_is_not_none(start_result_left['volume'])
981        assert_equal(start_result_left['otherstate'], 0)
982
983        # connect ref_right
984        dut_ref_right, ref_right_dut = await ref_device_connect(self.ref_right, Ear.RIGHT)
985        le_psm_future_right = self.get_le_psm_future(self.ref_right)
986        read_only_properties_future_right = self.get_read_only_properties_future(self.ref_right)
987
988        # DUT starts pairing with the ref_right
989        (secure_right, wait_security_right) = await asyncio.gather(
990            self.dut.aio.security.Secure(connection=dut_ref_right, le=LE_LEVEL3),
991            self.ref_right.aio.security.WaitSecurity(connection=ref_right_dut, le=LE_LEVEL3),
992        )
993
994        assert_equal(secure_right.result_variant(), 'success')
995        assert_equal(wait_security_right.result_variant(), 'success')
996
997        le_psm_out_result_right = await asyncio.wait_for(le_psm_future_right, timeout=3.0)
998        assert_is_not_none(le_psm_out_result_right)
999
1000        read_only_properties_result_right = await asyncio.wait_for(read_only_properties_future_right, timeout=3.0)
1001        assert_is_not_none(read_only_properties_result_right)
1002
1003        start_future_right = self.get_start_future(self.ref_right)
1004
1005        logging.info("send start_right")
1006        await dut_asha.WaitPeripheral(connection=dut_ref_right)
1007        start_result_right = await asyncio.wait_for(start_future_right, timeout=10.0)
1008
1009        logging.info(f"start_result_right:{start_result_right}")
1010        assert_is_not_none(start_result_right)
1011        assert_equal(start_result_right['codec'], 1)
1012        assert_equal(start_result_right['audiotype'], 0)
1013        assert_is_not_none(start_result_right['volume'])
1014        # ref_left already connected, otherstate = 1
1015        assert_equal(start_result_right['otherstate'], 1)
1016
1017
1018if __name__ == "__main__":
1019    logging.basicConfig(level=logging.DEBUG)
1020    test_runner.main()  # type: ignore
1021