• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import avatar
17import grpc
18import logging
19
20from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices
21from bumble.smp import PairingDelegate
22from concurrent import futures
23from contextlib import suppress
24from mobly import base_test, signals, test_runner
25from mobly.asserts import assert_equal  # type: ignore
26from mobly.asserts import assert_in  # type: ignore
27from mobly.asserts import assert_is_none  # type: ignore
28from mobly.asserts import assert_is_not_none  # type: ignore
29from mobly.asserts import explicit_pass, fail  # type: ignore
30from pandora.host_pb2 import (
31    DISCOVERABLE_GENERAL,
32    DISCOVERABLE_LIMITED,
33    NOT_DISCOVERABLE,
34    PUBLIC,
35    RANDOM,
36    DataTypes,
37    DiscoverabilityMode,
38    OwnAddressType,
39)
40from pandora.security_pb2 import LE_LEVEL3, LEVEL2, PairingEventAnswer
41from typing import NoReturn, Optional
42
43
44class ExampleTest(base_test.BaseTestClass):  # type: ignore[misc]
45    devices: Optional[PandoraDevices] = None
46
47    # pandora devices.
48    dut: PandoraDevice
49    ref: PandoraDevice
50
51    def setup_class(self) -> None:
52        self.devices = PandoraDevices(self)
53        self.dut, self.ref, *_ = self.devices
54
55        # Enable BR/EDR mode for Bumble devices.
56        for device in self.devices:
57            if isinstance(device, BumblePandoraDevice):
58                device.config.setdefault('classic_enabled', True)
59
60    def teardown_class(self) -> None:
61        if self.devices:
62            self.devices.stop_all()
63
64    @avatar.asynchronous
65    async def setup_test(self) -> None:  # pytype: disable=wrong-arg-types
66        await asyncio.gather(self.dut.reset(), self.ref.reset())
67
68    def test_print_addresses(self) -> None:
69        dut_address = self.dut.address
70        self.dut.log.info(f'Address: {dut_address}')
71        ref_address = self.ref.address
72        self.ref.log.info(f'Address: {ref_address}')
73
74    def test_classic_connect(self) -> None:
75        dut_address = self.dut.address
76        self.dut.log.info(f'Address: {dut_address}')
77        connection = self.ref.host.Connect(address=dut_address).connection
78        assert connection
79        self.ref.log.info(f'Connected with: {dut_address}')
80        self.ref.host.Disconnect(connection=connection)
81
82    # Using this decorator allow us to write one `test_le_connect`, and
83    # run it multiple time with different parameters.
84    # Here we check that no matter the address type we use for both sides
85    # the connection still complete.
86    @avatar.parameterized(
87        (RANDOM, RANDOM),
88        (RANDOM, PUBLIC),
89    )  # type: ignore[misc]
90    def test_le_connect(self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType) -> None:
91        if not isinstance(self.ref, BumblePandoraDevice):
92            raise signals.TestSkip('Test require Bumble as reference device')
93
94        advertisement = self.ref.host.Advertise(legacy=True, connectable=True, own_address_type=ref_address_type)
95        scan = self.dut.host.Scan(own_address_type=dut_address_type)
96        if ref_address_type == PUBLIC:
97            scan_response = next((x for x in scan if x.public == self.ref.address))
98            dut_ref = self.dut.host.ConnectLE(
99                public=scan_response.public,
100                own_address_type=dut_address_type,
101            ).connection
102        else:
103            scan_response = next((x for x in scan if x.random == self.ref.random_address))
104            dut_ref = self.dut.host.ConnectLE(
105                random=scan_response.random,
106                own_address_type=dut_address_type,
107            ).connection
108        scan.cancel()
109        ref_dut = next(advertisement).connection
110        advertisement.cancel()
111        assert dut_ref and ref_dut
112        self.dut.host.Disconnect(connection=dut_ref)
113
114    @avatar.rpc_except(
115        {
116            # This test should reach the `Inquiry` timeout.
117            grpc.StatusCode.DEADLINE_EXCEEDED: lambda e: explicit_pass(e.details()),
118        }
119    )
120    def test_not_discoverable(self) -> None:
121        self.dut.host.SetDiscoverabilityMode(mode=NOT_DISCOVERABLE)
122        inquiry = self.ref.host.Inquiry(timeout=3.0)
123        try:
124            assert_is_none(next((x for x in inquiry if x.address == self.dut.address), None))
125        finally:
126            inquiry.cancel()
127
128    @avatar.parameterized(
129        (DISCOVERABLE_LIMITED,),
130        (DISCOVERABLE_GENERAL,),
131    )  # type: ignore[misc]
132    def test_discoverable(self, mode: DiscoverabilityMode) -> None:
133        self.dut.host.SetDiscoverabilityMode(mode=mode)
134        inquiry = self.ref.host.Inquiry(timeout=15.0)
135        try:
136            assert_is_not_none(next((x for x in inquiry if x.address == self.dut.address), None))
137        finally:
138            inquiry.cancel()
139
140    @avatar.asynchronous
141    async def test_wait_connection(self) -> None:  # pytype: disable=wrong-arg-types
142        dut_ref_co = self.dut.aio.host.WaitConnection(address=self.ref.address)
143        ref_dut = await self.ref.aio.host.Connect(address=self.dut.address)
144        dut_ref = await dut_ref_co
145        assert_is_not_none(ref_dut.connection)
146        assert_is_not_none(dut_ref.connection)
147        assert ref_dut.connection
148        await self.ref.aio.host.Disconnect(connection=ref_dut.connection)
149
150    def test_scan_response_data(self) -> None:
151        advertisement = self.dut.host.Advertise(
152            legacy=True,
153            data=DataTypes(
154                complete_service_class_uuids16=['FDF0'],
155            ),
156            scan_response_data=DataTypes(
157                include_class_of_device=True,
158            ),
159        )
160
161        scan = self.ref.host.Scan()
162        scan_response = next((x for x in scan if x.public == self.dut.address))
163
164        scan.cancel()
165        advertisement.cancel()
166
167        assert_equal(type(scan_response.data.class_of_device), int)
168        assert_equal(type(scan_response.data.complete_service_class_uuids16[0]), str)
169
170    async def handle_pairing_events(self) -> NoReturn:
171        ref_pairing_stream = self.ref.aio.security.OnPairing()
172        dut_pairing_stream = self.dut.aio.security.OnPairing()
173
174        try:
175            while True:
176                ref_pairing_event, dut_pairing_event = await asyncio.gather(
177                    anext(ref_pairing_stream),  # pytype: disable=name-error
178                    anext(dut_pairing_stream),  # pytype: disable=name-error
179                )
180
181                if dut_pairing_event.method_variant() in ('numeric_comparison', 'just_works'):
182                    assert_in(ref_pairing_event.method_variant(), ('numeric_comparison', 'just_works'))
183                    dut_pairing_stream.send_nowait(
184                        PairingEventAnswer(
185                            event=dut_pairing_event,
186                            confirm=True,
187                        )
188                    )
189                    ref_pairing_stream.send_nowait(
190                        PairingEventAnswer(
191                            event=ref_pairing_event,
192                            confirm=True,
193                        )
194                    )
195                elif dut_pairing_event.method_variant() == 'passkey_entry_notification':
196                    assert_equal(ref_pairing_event.method_variant(), 'passkey_entry_request')
197                    ref_pairing_stream.send_nowait(
198                        PairingEventAnswer(
199                            event=ref_pairing_event,
200                            passkey=dut_pairing_event.passkey_entry_notification,
201                        )
202                    )
203                elif dut_pairing_event.method_variant() == 'passkey_entry_request':
204                    assert_equal(ref_pairing_event.method_variant(), 'passkey_entry_notification')
205                    dut_pairing_stream.send_nowait(
206                        PairingEventAnswer(
207                            event=dut_pairing_event,
208                            passkey=ref_pairing_event.passkey_entry_notification,
209                        )
210                    )
211                else:
212                    fail("unreachable")
213
214        finally:
215            ref_pairing_stream.cancel()
216            dut_pairing_stream.cancel()
217
218    @avatar.parameterized(
219        (PairingDelegate.NO_OUTPUT_NO_INPUT,),
220        (PairingDelegate.KEYBOARD_INPUT_ONLY,),
221        (PairingDelegate.DISPLAY_OUTPUT_ONLY,),
222        (PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,),
223        (PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,),
224    )  # type: ignore[misc]
225    @avatar.asynchronous
226    async def test_classic_pairing(self, ref_io_capability: int) -> None:  # pytype: disable=wrong-arg-types
227        if not isinstance(self.ref, BumblePandoraDevice):
228            raise signals.TestSkip('Test require Bumble as reference device(s)')
229
230        # override reference device IO capability
231        setattr(self.ref.device, 'io_capability', ref_io_capability)
232
233        pairing = asyncio.create_task(self.handle_pairing_events())
234        (dut_ref_res, ref_dut_res) = await asyncio.gather(
235            self.dut.aio.host.WaitConnection(address=self.ref.address),
236            self.ref.aio.host.Connect(address=self.dut.address),
237        )
238
239        assert_equal(ref_dut_res.result_variant(), 'connection')
240        assert_equal(dut_ref_res.result_variant(), 'connection')
241        ref_dut = ref_dut_res.connection
242        dut_ref = dut_ref_res.connection
243        assert ref_dut and dut_ref
244
245        (secure, wait_security) = await asyncio.gather(
246            self.ref.aio.security.Secure(connection=ref_dut, classic=LEVEL2),
247            self.dut.aio.security.WaitSecurity(connection=dut_ref, classic=LEVEL2),
248        )
249
250        pairing.cancel()
251        with suppress(asyncio.CancelledError, futures.CancelledError):
252            await pairing
253
254        assert_equal(secure.result_variant(), 'success')
255        assert_equal(wait_security.result_variant(), 'success')
256
257        await asyncio.gather(
258            self.dut.aio.host.Disconnect(connection=dut_ref),
259            self.ref.aio.host.WaitDisconnection(connection=ref_dut),
260        )
261
262    @avatar.parameterized(
263        (RANDOM, RANDOM, PairingDelegate.NO_OUTPUT_NO_INPUT),
264        (RANDOM, RANDOM, PairingDelegate.KEYBOARD_INPUT_ONLY),
265        (RANDOM, RANDOM, PairingDelegate.DISPLAY_OUTPUT_ONLY),
266        (RANDOM, RANDOM, PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT),
267        (RANDOM, RANDOM, PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT),
268        (RANDOM, PUBLIC, PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT),
269    )  # type: ignore[misc]
270    @avatar.asynchronous
271    async def test_le_pairing(  # pytype: disable=wrong-arg-types
272        self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, ref_io_capability: int
273    ) -> None:
274        if not isinstance(self.ref, BumblePandoraDevice):
275            raise signals.TestSkip('Test require Bumble as reference device(s)')
276
277        # override reference device IO capability
278        setattr(self.ref.device, 'io_capability', ref_io_capability)
279
280        advertisement = self.dut.aio.host.Advertise(
281            legacy=True,
282            connectable=True,
283            own_address_type=dut_address_type,
284            data=DataTypes(manufacturer_specific_data=b'pause cafe'),
285        )
286
287        scan = self.ref.aio.host.Scan(own_address_type=ref_address_type)
288        dut = await anext(
289            (x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data)
290        )  # pytype: disable=name-error
291        scan.cancel()
292        assert dut
293
294        pairing = asyncio.create_task(self.handle_pairing_events())
295        (ref_dut_res, dut_ref_res) = await asyncio.gather(
296            self.ref.aio.host.ConnectLE(own_address_type=ref_address_type, **dut.address_asdict()),
297            anext(aiter(advertisement)),  # pytype: disable=name-error
298        )
299
300        advertisement.cancel()
301        ref_dut, dut_ref = ref_dut_res.connection, dut_ref_res.connection
302        assert ref_dut and dut_ref
303
304        (secure, wait_security) = await asyncio.gather(
305            self.ref.aio.security.Secure(connection=ref_dut, le=LE_LEVEL3),
306            self.dut.aio.security.WaitSecurity(connection=dut_ref, le=LE_LEVEL3),
307        )
308
309        pairing.cancel()
310        with suppress(asyncio.CancelledError, futures.CancelledError):
311            await pairing
312
313        assert_equal(secure.result_variant(), 'success')
314        assert_equal(wait_security.result_variant(), 'success')
315
316        await asyncio.gather(
317            self.dut.aio.host.Disconnect(connection=dut_ref),
318            self.ref.aio.host.WaitDisconnection(connection=ref_dut),
319        )
320
321
322if __name__ == '__main__':
323    logging.basicConfig(level=logging.DEBUG)
324    test_runner.main()  # type: ignore
325