1# Copyright 2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import avatar 17import itertools 18import logging 19 20from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices 21from bumble.hci import HCI_CENTRAL_ROLE, HCI_PERIPHERAL_ROLE 22from bumble.pairing import PairingDelegate 23from mobly import base_test, signals, test_runner 24from mobly.asserts import assert_equal # type: ignore 25from mobly.asserts import assert_in # type: ignore 26from mobly.asserts import assert_is_not_none # type: ignore 27from mobly.asserts import fail # type: ignore 28from pandora.host_pb2 import Connection 29from pandora.security_pb2 import LEVEL2, PairingEventAnswer, SecureResponse, SecurityLevel, WaitSecurityResponse 30from typing import Callable, Coroutine, Optional, Tuple 31 32ALL_ROLES = (HCI_CENTRAL_ROLE, HCI_PERIPHERAL_ROLE) 33ALL_IO_CAPABILITIES = ( 34 None, 35 PairingDelegate.DISPLAY_OUTPUT_ONLY, 36 PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT, 37 PairingDelegate.KEYBOARD_INPUT_ONLY, 38 PairingDelegate.NO_OUTPUT_NO_INPUT, 39 PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT, 40) 41 42 43class ClassicSspTest(base_test.BaseTestClass): # type: ignore[misc] 44 ''' 45 This class aim to test SSP (Secure Simple Pairing) on Classic 46 Bluetooth devices. 47 ''' 48 49 devices: Optional[PandoraDevices] = None 50 51 # pandora devices. 52 dut: PandoraDevice 53 ref: PandoraDevice 54 55 @avatar.asynchronous 56 async def setup_class(self) -> None: 57 self.devices = PandoraDevices(self) 58 self.dut, self.ref, *_ = self.devices 59 60 # Enable BR/EDR mode and SSP for Bumble devices. 61 for device in self.devices: 62 if isinstance(device, BumblePandoraDevice): 63 device.config.setdefault('classic_enabled', True) 64 device.config.setdefault('classic_ssp_enabled', True) 65 device.config.setdefault( 66 'server', 67 { 68 'io_capability': 'display_output_and_yes_no_input', 69 }, 70 ) 71 72 await asyncio.gather(self.dut.reset(), self.ref.reset()) 73 74 def teardown_class(self) -> None: 75 if self.devices: 76 self.devices.stop_all() 77 78 @avatar.asynchronous 79 async def setup_test(self) -> None: # pytype: disable=wrong-arg-types 80 await asyncio.gather(self.dut.reset(), self.ref.reset()) 81 82 @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES)) # type: ignore[misc] 83 @avatar.asynchronous 84 async def test_success_initiate_connection_initiate_pairing( 85 self, 86 ref_io_capability: Optional[PairingDelegate.IoCapability], 87 ref_role: Optional[int], 88 ) -> None: 89 # Override REF IO capability if supported. 90 set_io_capability(self.ref, ref_io_capability) 91 92 # Connection/pairing task. 93 async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]: 94 dut_ref, ref_dut = await make_classic_connection(self.dut, self.ref) 95 if ref_role is not None: 96 await role_switch(self.ref, ref_dut, ref_role) 97 return await authenticate(self.dut, dut_ref, self.ref, ref_dut, LEVEL2) 98 99 # Handle pairing. 100 initiator_pairing, acceptor_pairing = await handle_pairing( 101 self.dut, 102 self.ref, 103 connect_and_pair, 104 ) 105 106 # Assert success. 107 assert_equal(initiator_pairing.result_variant(), 'success') 108 assert_equal(acceptor_pairing.result_variant(), 'success') 109 110 @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES)) # type: ignore[misc] 111 @avatar.asynchronous 112 async def test_success_initiate_connection_accept_pairing( 113 self, 114 ref_io_capability: Optional[PairingDelegate.IoCapability], 115 ref_role: Optional[int], 116 ) -> None: 117 if not isinstance(self.dut, BumblePandoraDevice): 118 raise signals.TestSkip('TODO: Fix rootcanal when both AOSP and Bumble trigger the auth.') 119 120 # Override REF IO capability if supported. 121 set_io_capability(self.ref, ref_io_capability) 122 123 # Connection/pairing task. 124 async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]: 125 dut_ref, ref_dut = await make_classic_connection(self.dut, self.ref) 126 if ref_role is not None: 127 await role_switch(self.ref, ref_dut, ref_role) 128 return await authenticate(self.ref, ref_dut, self.dut, dut_ref, LEVEL2) 129 130 # Handle pairing. 131 initiator_pairing, acceptor_pairing = await handle_pairing( 132 self.dut, 133 self.ref, 134 connect_and_pair, 135 ) 136 137 # Assert success. 138 assert_equal(initiator_pairing.result_variant(), 'success') 139 assert_equal(acceptor_pairing.result_variant(), 'success') 140 141 @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES)) # type: ignore[misc] 142 @avatar.asynchronous 143 async def test_success_accept_connection_initiate_pairing( 144 self, 145 ref_io_capability: Optional[PairingDelegate.IoCapability], 146 ref_role: Optional[int], 147 ) -> None: 148 # Override REF IO capability if supported. 149 set_io_capability(self.ref, ref_io_capability) 150 151 # Connection/pairing task. 152 async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]: 153 ref_dut, dut_ref = await make_classic_connection(self.ref, self.dut) 154 if ref_role is not None: 155 await role_switch(self.ref, ref_dut, ref_role) 156 return await authenticate(self.dut, dut_ref, self.ref, ref_dut, LEVEL2) 157 158 # Handle pairing. 159 initiator_pairing, acceptor_pairing = await handle_pairing( 160 self.dut, 161 self.ref, 162 connect_and_pair, 163 ) 164 165 # Assert success. 166 assert_equal(initiator_pairing.result_variant(), 'success') 167 assert_equal(acceptor_pairing.result_variant(), 'success') 168 169 @avatar.parameterized(*itertools.product(ALL_IO_CAPABILITIES, ALL_ROLES)) # type: ignore[misc] 170 @avatar.asynchronous 171 async def test_success_accept_connection_accept_pairing( 172 self, 173 ref_io_capability: Optional[PairingDelegate.IoCapability], 174 ref_role: Optional[int], 175 ) -> None: 176 # Override REF IO capability if supported. 177 set_io_capability(self.ref, ref_io_capability) 178 179 # Connection/pairing task. 180 async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]: 181 ref_dut, dut_ref = await make_classic_connection(self.ref, self.dut) 182 if ref_role is not None: 183 await role_switch(self.ref, ref_dut, ref_role) 184 return await authenticate(self.ref, ref_dut, self.dut, dut_ref, LEVEL2) 185 186 # Handle pairing. 187 initiator_pairing, acceptor_pairing = await handle_pairing( 188 self.dut, 189 self.ref, 190 connect_and_pair, 191 ) 192 193 # Assert success. 194 assert_equal(initiator_pairing.result_variant(), 'success') 195 assert_equal(acceptor_pairing.result_variant(), 'success') 196 197 198def set_io_capability(device: PandoraDevice, io_capability: Optional[PairingDelegate.IoCapability]) -> None: 199 if io_capability is None: 200 return 201 if isinstance(device, BumblePandoraDevice): 202 # Override Bumble reference device default IO capability. 203 device.server_config.io_capability = io_capability 204 else: 205 raise signals.TestSkip('Unable to override IO capability on non Bumble device.') 206 207 208# Connection task. 209async def make_classic_connection(initiator: PandoraDevice, acceptor: PandoraDevice) -> Tuple[Connection, Connection]: 210 '''Connect two device and returns both connection tokens.''' 211 212 (connect, wait_connection) = await asyncio.gather( 213 initiator.aio.host.Connect(address=acceptor.address), 214 acceptor.aio.host.WaitConnection(address=initiator.address), 215 ) 216 217 # Assert connection are successful. 218 assert_equal(connect.result_variant(), 'connection') 219 assert_equal(wait_connection.result_variant(), 'connection') 220 assert_is_not_none(connect.connection) 221 assert_is_not_none(wait_connection.connection) 222 assert connect.connection and wait_connection.connection 223 224 # Returns connections. 225 return connect.connection, wait_connection.connection 226 227 228# Pairing task. 229async def authenticate( 230 initiator: PandoraDevice, 231 initiator_connection: Connection, 232 acceptor: PandoraDevice, 233 acceptor_connection: Connection, 234 security_level: SecurityLevel, 235) -> Tuple[SecureResponse, WaitSecurityResponse]: 236 '''Pair two device and returns both pairing responses.''' 237 238 return await asyncio.gather( 239 initiator.aio.security.Secure(connection=initiator_connection, classic=security_level), 240 acceptor.aio.security.WaitSecurity(connection=acceptor_connection, classic=security_level), 241 ) 242 243 244# Role switch task. 245async def role_switch( 246 device: PandoraDevice, 247 connection: Connection, 248 role: int, 249) -> None: 250 '''Switch role if supported.''' 251 252 if not isinstance(device, BumblePandoraDevice): 253 return 254 255 connection_handle = int.from_bytes(connection.cookie.value, 'big') 256 bumble_connection = device.device.lookup_connection(connection_handle) 257 assert_is_not_none(bumble_connection) 258 assert bumble_connection 259 260 if bumble_connection.role != role: 261 device.log.info(f"Role switch to: {'`CENTRAL`' if role == HCI_CENTRAL_ROLE else '`PERIPHERAL`'}") 262 await bumble_connection.switch_role(role) 263 264 265# Handle pairing events task. 266async def handle_pairing( 267 dut: PandoraDevice, 268 ref: PandoraDevice, 269 connect_and_pair: Callable[[], Coroutine[None, None, Tuple[SecureResponse, WaitSecurityResponse]]], 270 confirm: Callable[[bool], bool] = lambda x: x, 271 passkey: Callable[[int], int] = lambda x: x, 272) -> Tuple[SecureResponse, WaitSecurityResponse]: 273 274 # Listen for pairing event on bot DUT and REF. 275 dut_pairing, ref_pairing = dut.aio.security.OnPairing(), ref.aio.security.OnPairing() 276 277 # Start connection/pairing. 278 connect_and_pair_task = asyncio.create_task(connect_and_pair()) 279 280 try: 281 dut_ev = await asyncio.wait_for(anext(dut_pairing), timeout=25.0) 282 dut.log.info(f'DUT pairing event: {dut_ev.method_variant()}') 283 284 ref_ev = await asyncio.wait_for(anext(ref_pairing), timeout=3.0) 285 ref.log.info(f'REF pairing event: {ref_ev.method_variant()}') 286 287 if dut_ev.method_variant() in ('numeric_comparison', 'just_works'): 288 assert_in(ref_ev.method_variant(), ('numeric_comparison', 'just_works')) 289 confirm_res = True 290 if dut_ev.method_variant() == 'numeric_comparison' and ref_ev.method_variant() == 'numeric_comparison': 291 confirm_res = ref_ev.numeric_comparison == dut_ev.numeric_comparison 292 confirm_res = confirm(confirm_res) 293 dut_pairing.send_nowait(PairingEventAnswer(event=dut_ev, confirm=confirm_res)) 294 ref_pairing.send_nowait(PairingEventAnswer(event=ref_ev, confirm=confirm_res)) 295 296 elif dut_ev.method_variant() == 'passkey_entry_notification': 297 assert_equal(ref_ev.method_variant(), 'passkey_entry_request') 298 assert_is_not_none(dut_ev.passkey_entry_notification) 299 assert dut_ev.passkey_entry_notification is not None 300 passkey_res = passkey(dut_ev.passkey_entry_notification) 301 ref_pairing.send_nowait(PairingEventAnswer(event=ref_ev, passkey=passkey_res)) 302 303 elif dut_ev.method_variant() == 'passkey_entry_request': 304 assert_equal(ref_ev.method_variant(), 'passkey_entry_notification') 305 assert_is_not_none(ref_ev.passkey_entry_notification) 306 assert ref_ev.passkey_entry_notification is not None 307 passkey_res = passkey(ref_ev.passkey_entry_notification) 308 dut_pairing.send_nowait(PairingEventAnswer(event=dut_ev, passkey=passkey_res)) 309 310 else: 311 fail("") 312 313 except (asyncio.CancelledError, asyncio.TimeoutError): 314 logging.exception('Pairing timed-out.') 315 316 finally: 317 318 try: 319 (secure, wait_security) = await asyncio.wait_for(connect_and_pair_task, 15.0) 320 logging.info(f'Pairing result: {secure.result_variant()}/{wait_security.result_variant()}') 321 return secure, wait_security 322 323 finally: 324 dut_pairing.cancel() 325 ref_pairing.cancel() 326 327 328if __name__ == '__main__': 329 logging.basicConfig(level=logging.DEBUG) 330 test_runner.main() # type: ignore 331