• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import avatar
17import itertools
18import logging
19
20from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices
21from bumble.hci import HCI_CENTRAL_ROLE, HCI_PERIPHERAL_ROLE
22from bumble.pairing import PairingDelegate
23from mobly import base_test, signals, test_runner
24from mobly.asserts import assert_equal  # type: ignore
25from mobly.asserts import assert_in  # type: ignore
26from mobly.asserts import assert_is_not_none  # type: ignore
27from mobly.asserts import fail  # type: ignore
28from pandora.host_pb2 import Connection
29from pandora.security_pb2 import LEVEL2, PairingEventAnswer, SecureResponse, SecurityLevel, WaitSecurityResponse
30from typing import Callable, Coroutine, Optional, Tuple
31
32ALL_ROLES = (HCI_CENTRAL_ROLE, HCI_PERIPHERAL_ROLE)
33ALL_IO_CAPABILITIES = (
34    None,
35    PairingDelegate.DISPLAY_OUTPUT_ONLY,
36    PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
37    PairingDelegate.KEYBOARD_INPUT_ONLY,
38    PairingDelegate.NO_OUTPUT_NO_INPUT,
39    PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
40)
41
42
43class ClassicSspTest(base_test.BaseTestClass):  # type: ignore[misc]
44    '''
45    This class aim to test SSP (Secure Simple Pairing) on Classic
46    Bluetooth devices.
47    '''
48
49    devices: Optional[PandoraDevices] = None
50
51    # pandora devices.
52    dut: PandoraDevice
53    ref: PandoraDevice
54
55    @avatar.asynchronous
56    async def setup_class(self) -> None:
57        self.devices = PandoraDevices(self)
58        self.dut, self.ref, *_ = self.devices
59
60        # Enable BR/EDR mode and SSP for Bumble devices.
61        for device in self.devices:
62            if isinstance(device, BumblePandoraDevice):
63                device.config.setdefault('classic_enabled', True)
64                device.config.setdefault('classic_ssp_enabled', True)
65                device.config.setdefault(
66                    'server',
67                    {
68                        'io_capability': 'display_output_and_yes_no_input',
69                    },
70                )
71
72        await asyncio.gather(self.dut.reset(), self.ref.reset())
73
74    def teardown_class(self) -> None:
75        if self.devices:
76            self.devices.stop_all()
77
78    @avatar.asynchronous
79    async def setup_test(self) -> None:  # pytype: disable=wrong-arg-types
80        await asyncio.gather(self.dut.reset(), self.ref.reset())
81
82    @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES))  # type: ignore[misc]
83    @avatar.asynchronous
84    async def test_success_initiate_connection_initiate_pairing(
85        self,
86        ref_io_capability: Optional[PairingDelegate.IoCapability],
87        ref_role: Optional[int],
88    ) -> None:
89        # Override REF IO capability if supported.
90        set_io_capability(self.ref, ref_io_capability)
91
92        # Connection/pairing task.
93        async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]:
94            dut_ref, ref_dut = await make_classic_connection(self.dut, self.ref)
95            if ref_role is not None:
96                await role_switch(self.ref, ref_dut, ref_role)
97            return await authenticate(self.dut, dut_ref, self.ref, ref_dut, LEVEL2)
98
99        # Handle pairing.
100        initiator_pairing, acceptor_pairing = await handle_pairing(
101            self.dut,
102            self.ref,
103            connect_and_pair,
104        )
105
106        # Assert success.
107        assert_equal(initiator_pairing.result_variant(), 'success')
108        assert_equal(acceptor_pairing.result_variant(), 'success')
109
110    @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES))  # type: ignore[misc]
111    @avatar.asynchronous
112    async def test_success_initiate_connection_accept_pairing(
113        self,
114        ref_io_capability: Optional[PairingDelegate.IoCapability],
115        ref_role: Optional[int],
116    ) -> None:
117        if not isinstance(self.dut, BumblePandoraDevice):
118            raise signals.TestSkip('TODO: Fix rootcanal when both AOSP and Bumble trigger the auth.')
119
120        # Override REF IO capability if supported.
121        set_io_capability(self.ref, ref_io_capability)
122
123        # Connection/pairing task.
124        async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]:
125            dut_ref, ref_dut = await make_classic_connection(self.dut, self.ref)
126            if ref_role is not None:
127                await role_switch(self.ref, ref_dut, ref_role)
128            return await authenticate(self.ref, ref_dut, self.dut, dut_ref, LEVEL2)
129
130        # Handle pairing.
131        initiator_pairing, acceptor_pairing = await handle_pairing(
132            self.dut,
133            self.ref,
134            connect_and_pair,
135        )
136
137        # Assert success.
138        assert_equal(initiator_pairing.result_variant(), 'success')
139        assert_equal(acceptor_pairing.result_variant(), 'success')
140
141    @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES))  # type: ignore[misc]
142    @avatar.asynchronous
143    async def test_success_accept_connection_initiate_pairing(
144        self,
145        ref_io_capability: Optional[PairingDelegate.IoCapability],
146        ref_role: Optional[int],
147    ) -> None:
148        # Override REF IO capability if supported.
149        set_io_capability(self.ref, ref_io_capability)
150
151        # Connection/pairing task.
152        async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]:
153            ref_dut, dut_ref = await make_classic_connection(self.ref, self.dut)
154            if ref_role is not None:
155                await role_switch(self.ref, ref_dut, ref_role)
156            return await authenticate(self.dut, dut_ref, self.ref, ref_dut, LEVEL2)
157
158        # Handle pairing.
159        initiator_pairing, acceptor_pairing = await handle_pairing(
160            self.dut,
161            self.ref,
162            connect_and_pair,
163        )
164
165        # Assert success.
166        assert_equal(initiator_pairing.result_variant(), 'success')
167        assert_equal(acceptor_pairing.result_variant(), 'success')
168
169    @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES))  # type: ignore[misc]
170    @avatar.asynchronous
171    async def test_success_accept_connection_accept_pairing(
172        self,
173        ref_io_capability: Optional[PairingDelegate.IoCapability],
174        ref_role: Optional[int],
175    ) -> None:
176        # Override REF IO capability if supported.
177        set_io_capability(self.ref, ref_io_capability)
178
179        # Connection/pairing task.
180        async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]:
181            ref_dut, dut_ref = await make_classic_connection(self.ref, self.dut)
182            if ref_role is not None:
183                await role_switch(self.ref, ref_dut, ref_role)
184            return await authenticate(self.ref, ref_dut, self.dut, dut_ref, LEVEL2)
185
186        # Handle pairing.
187        initiator_pairing, acceptor_pairing = await handle_pairing(
188            self.dut,
189            self.ref,
190            connect_and_pair,
191        )
192
193        # Assert success.
194        assert_equal(initiator_pairing.result_variant(), 'success')
195        assert_equal(acceptor_pairing.result_variant(), 'success')
196
197
198def set_io_capability(device: PandoraDevice, io_capability: Optional[PairingDelegate.IoCapability]) -> None:
199    if io_capability is None:
200        return
201    if isinstance(device, BumblePandoraDevice):
202        # Override Bumble reference device default IO capability.
203        device.server_config.io_capability = io_capability
204    else:
205        raise signals.TestSkip('Unable to override IO capability on non Bumble device.')
206
207
208# Connection task.
209async def make_classic_connection(initiator: PandoraDevice, acceptor: PandoraDevice) -> Tuple[Connection, Connection]:
210    '''Connect two device and returns both connection tokens.'''
211
212    (connect, wait_connection) = await asyncio.gather(
213        initiator.aio.host.Connect(address=acceptor.address),
214        acceptor.aio.host.WaitConnection(address=initiator.address),
215    )
216
217    # Assert connection are successful.
218    assert_equal(connect.result_variant(), 'connection')
219    assert_equal(wait_connection.result_variant(), 'connection')
220    assert_is_not_none(connect.connection)
221    assert_is_not_none(wait_connection.connection)
222    assert connect.connection and wait_connection.connection
223
224    # Returns connections.
225    return connect.connection, wait_connection.connection
226
227
228# Pairing task.
229async def authenticate(
230    initiator: PandoraDevice,
231    initiator_connection: Connection,
232    acceptor: PandoraDevice,
233    acceptor_connection: Connection,
234    security_level: SecurityLevel,
235) -> Tuple[SecureResponse, WaitSecurityResponse]:
236    '''Pair two device and returns both pairing responses.'''
237
238    return await asyncio.gather(
239        initiator.aio.security.Secure(connection=initiator_connection, classic=security_level),
240        acceptor.aio.security.WaitSecurity(connection=acceptor_connection, classic=security_level),
241    )
242
243
244# Role switch task.
245async def role_switch(
246    device: PandoraDevice,
247    connection: Connection,
248    role: int,
249) -> None:
250    '''Switch role if supported.'''
251
252    if not isinstance(device, BumblePandoraDevice):
253        return
254
255    connection_handle = int.from_bytes(connection.cookie.value, 'big')
256    bumble_connection = device.device.lookup_connection(connection_handle)
257    assert_is_not_none(bumble_connection)
258    assert bumble_connection
259
260    if bumble_connection.role != role:
261        device.log.info(f"Role switch to: {'`CENTRAL`' if role == HCI_CENTRAL_ROLE else '`PERIPHERAL`'}")
262        await bumble_connection.switch_role(role)
263
264
265# Handle pairing events task.
266async def handle_pairing(
267    dut: PandoraDevice,
268    ref: PandoraDevice,
269    connect_and_pair: Callable[[], Coroutine[None, None, Tuple[SecureResponse, WaitSecurityResponse]]],
270    confirm: Callable[[bool], bool] = lambda x: x,
271    passkey: Callable[[int], int] = lambda x: x,
272) -> Tuple[SecureResponse, WaitSecurityResponse]:
273
274    # Listen for pairing event on bot DUT and REF.
275    dut_pairing, ref_pairing = dut.aio.security.OnPairing(), ref.aio.security.OnPairing()
276
277    # Start connection/pairing.
278    connect_and_pair_task = asyncio.create_task(connect_and_pair())
279
280    try:
281        dut_ev = await asyncio.wait_for(anext(dut_pairing), timeout=25.0)
282        dut.log.info(f'DUT pairing event: {dut_ev.method_variant()}')
283
284        ref_ev = await asyncio.wait_for(anext(ref_pairing), timeout=3.0)
285        ref.log.info(f'REF pairing event: {ref_ev.method_variant()}')
286
287        if dut_ev.method_variant() in ('numeric_comparison', 'just_works'):
288            assert_in(ref_ev.method_variant(), ('numeric_comparison', 'just_works'))
289            confirm_res = True
290            if dut_ev.method_variant() == 'numeric_comparison' and ref_ev.method_variant() == 'numeric_comparison':
291                confirm_res = ref_ev.numeric_comparison == dut_ev.numeric_comparison
292            confirm_res = confirm(confirm_res)
293            dut_pairing.send_nowait(PairingEventAnswer(event=dut_ev, confirm=confirm_res))
294            ref_pairing.send_nowait(PairingEventAnswer(event=ref_ev, confirm=confirm_res))
295
296        elif dut_ev.method_variant() == 'passkey_entry_notification':
297            assert_equal(ref_ev.method_variant(), 'passkey_entry_request')
298            assert_is_not_none(dut_ev.passkey_entry_notification)
299            assert dut_ev.passkey_entry_notification is not None
300            passkey_res = passkey(dut_ev.passkey_entry_notification)
301            ref_pairing.send_nowait(PairingEventAnswer(event=ref_ev, passkey=passkey_res))
302
303        elif dut_ev.method_variant() == 'passkey_entry_request':
304            assert_equal(ref_ev.method_variant(), 'passkey_entry_notification')
305            assert_is_not_none(ref_ev.passkey_entry_notification)
306            assert ref_ev.passkey_entry_notification is not None
307            passkey_res = passkey(ref_ev.passkey_entry_notification)
308            dut_pairing.send_nowait(PairingEventAnswer(event=dut_ev, passkey=passkey_res))
309
310        else:
311            fail("")
312
313    except (asyncio.CancelledError, asyncio.TimeoutError):
314        logging.exception('Pairing timed-out.')
315
316    finally:
317
318        try:
319            (secure, wait_security) = await asyncio.wait_for(connect_and_pair_task, 15.0)
320            logging.info(f'Pairing result: {secure.result_variant()}/{wait_security.result_variant()}')
321            return secure, wait_security
322
323        finally:
324            dut_pairing.cancel()
325            ref_pairing.cancel()
326
327
328if __name__ == '__main__':
329    logging.basicConfig(level=logging.DEBUG)
330    test_runner.main()  # type: ignore
331