1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18 19from blueberry.tests.gd.cert.captures import HciCaptures 20from blueberry.tests.gd.cert.closable import safeClose 21from blueberry.tests.gd.cert.event_stream import EventStream 22from blueberry.tests.gd.cert.matchers import HciMatchers 23from blueberry.tests.gd.cert.py_hci import PyHci 24from blueberry.tests.gd.cert.py_security import PySecurity 25from blueberry.tests.gd.cert.truth import assertThat 26from google.protobuf import empty_pb2 as empty_proto 27from blueberry.facade.l2cap.classic import facade_pb2 as l2cap_facade 28from blueberry.facade.security.facade_pb2 import IoCapabilities 29from blueberry.facade.security.facade_pb2 import AuthenticationRequirements 30from blueberry.facade.security.facade_pb2 import OobDataPresent 31from blueberry.utils import bluetooth 32import hci_packets as hci 33 34 35class CertSecurity(PySecurity): 36 """ 37 Contain all of the certification stack logic for sending and receiving 38 HCI commands following the Classic Pairing flows. 39 """ 40 _io_cap_lookup = { 41 IoCapabilities.DISPLAY_ONLY: hci.IoCapability.DISPLAY_ONLY, 42 IoCapabilities.DISPLAY_YES_NO_IO_CAP: hci.IoCapability.DISPLAY_YES_NO, 43 IoCapabilities.KEYBOARD_ONLY: hci.IoCapability.KEYBOARD_ONLY, 44 IoCapabilities.NO_INPUT_NO_OUTPUT: hci.IoCapability.NO_INPUT_NO_OUTPUT, 45 } 46 47 _auth_req_lookup = { 48 AuthenticationRequirements.NO_BONDING: 49 hci.AuthenticationRequirements.NO_BONDING, 50 AuthenticationRequirements.NO_BONDING_MITM_PROTECTION: 51 hci.AuthenticationRequirements.NO_BONDING_MITM_PROTECTION, 52 AuthenticationRequirements.DEDICATED_BONDING: 53 hci.AuthenticationRequirements.DEDICATED_BONDING, 54 AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION: 55 hci.AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION, 56 AuthenticationRequirements.GENERAL_BONDING: 57 hci.AuthenticationRequirements.GENERAL_BONDING, 58 AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION: 59 hci.AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION, 60 } 61 62 _oob_present_lookup = { 63 OobDataPresent.NOT_PRESENT: hci.OobDataPresent.NOT_PRESENT, 64 OobDataPresent.P192_PRESENT: hci.OobDataPresent.P_192_PRESENT, 65 OobDataPresent.P256_PRESENT: hci.OobDataPresent.P_256_PRESENT, 66 OobDataPresent.P192_AND_256_PRESENT: hci.OobDataPresent.P_192_AND_256_PRESENT, 67 } 68 69 _hci_event_stream = None 70 _io_caps = hci.IoCapability.DISPLAY_ONLY 71 _auth_reqs = hci.AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION 72 _secure_connections_enabled = False 73 74 _hci = None 75 76 MAX_PIN_LENGTH = 16 77 MIN_PIN_LENGTH = 1 78 79 def _enqueue_hci_command(self, command, expect_complete): 80 if (expect_complete): 81 self._hci.send_command(command) 82 else: 83 self._hci.send_command(command) 84 85 def __init__(self, device): 86 """ 87 Don't call super b/c the gRPC stream setup will crash test 88 """ 89 logging.info("Cert: Init") 90 self._device = device 91 self._device.wait_channel_ready() 92 self._hci = PyHci(device) 93 self._hci.register_for_events( 94 hci.EventCode.ENCRYPTION_CHANGE, hci.EventCode.CHANGE_CONNECTION_LINK_KEY_COMPLETE, 95 hci.EventCode.CENTRAL_LINK_KEY_COMPLETE, hci.EventCode.RETURN_LINK_KEYS, hci.EventCode.PIN_CODE_REQUEST, 96 hci.EventCode.LINK_KEY_REQUEST, hci.EventCode.LINK_KEY_NOTIFICATION, 97 hci.EventCode.ENCRYPTION_KEY_REFRESH_COMPLETE, hci.EventCode.IO_CAPABILITY_REQUEST, 98 hci.EventCode.IO_CAPABILITY_RESPONSE, hci.EventCode.REMOTE_OOB_DATA_REQUEST, 99 hci.EventCode.SIMPLE_PAIRING_COMPLETE, hci.EventCode.USER_PASSKEY_NOTIFICATION, 100 hci.EventCode.KEYPRESS_NOTIFICATION, hci.EventCode.USER_CONFIRMATION_REQUEST, 101 hci.EventCode.USER_PASSKEY_REQUEST, hci.EventCode.REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION) 102 self._hci_event_stream = self._hci.get_event_stream() 103 104 def create_bond(self, address, type): 105 """ 106 Creates a bond from the cert perspective 107 """ 108 logging.info("Cert: Creating bond to '%s' from '%s'" % (str(address), str(self._device.address))) 109 # TODO(optedoblivion): Trigger connection to Send AuthenticationRequested 110 111 def remove_bond(self, address, type): 112 """ 113 We store the link key locally in the test and pretend 114 So to remove_bond we need to Remove the "stored" data 115 """ 116 pass 117 118 def set_io_capabilities(self, io_capabilities): 119 """ 120 Set the IO Capabilities used for the cert 121 """ 122 logging.info("Cert: setting IO Capabilities data to '%s'" % 123 self._io_capabilities_name_lookup.get(io_capabilities, "ERROR")) 124 self._io_caps = self._io_cap_lookup.get(io_capabilities, hci.IoCapability.DISPLAY_ONLY) 125 126 def set_authentication_requirements(self, auth_reqs): 127 """ 128 Establish authentication requirements for the stack 129 """ 130 logging.info("Cert: setting Authentication Requirements data to '%s'" % 131 self._auth_reqs_name_lookup.get(auth_reqs, "ERROR")) 132 self._auth_reqs = self._auth_req_lookup.get(auth_reqs, hci.AuthenticationRequirements.GENERAL_BONDING) 133 134 def get_oob_data_from_controller(self, pb_oob_data_type): 135 """ 136 Get the Out-of-band data for SSP pairing 137 138 :param pb_oob_data_type: Type of data needed 139 :return: a tuple of bytes (192c,192r,256c,256r) with increasing security; bytes may be all 0s depending on pb_oob_data_type value 140 141 """ 142 oob_data_type = self._oob_present_lookup[pb_oob_data_type] 143 144 if (oob_data_type == hci.OobDataPresent.NOT_PRESENT): 145 logging.warn("No data present, no need to call get_oob_data") 146 return ([0 for i in range(0, 16)], [0 for i in range(0, 16)], [0 for i in range(0, 16)], 147 [0 for i in range(0, 16)]) 148 149 logging.info("Cert: Requesting OOB data") 150 if oob_data_type == hci.OobDataPresent.P_192_PRESENT: 151 # If host and controller supports secure connections we always used ReadLocalOobExtendedDataRequest 152 if self._secure_connections_enabled: 153 logging.info("Cert: Requesting P192 Data; secure connections") 154 complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture() 155 self._enqueue_hci_command(hci.ReadLocalOobExtendedData(), True) 156 logging.info("Cert: Waiting for OOB response from controller") 157 assertThat(self._hci_event_stream).emits(complete_capture) 158 complete = complete_capture.get() 159 return (list(complete.c192), list(complete.r192), [0 for i in range(0, 16)], [0 for i in range(0, 16)]) 160 # else we use ReadLocalDataRequest 161 else: 162 logging.info("Cert: Requesting P192 Data; no secure connections") 163 complete_capture = HciCaptures.ReadLocalOobDataCompleteCapture() 164 self._enqueue_hci_command(hci.ReadLocalOobData(), True) 165 logging.info("Cert: Waiting for OOB response from controller") 166 assertThat(self._hci_event_stream).emits(complete_capture) 167 complete = complete_capture.get() 168 return (list(complete.c), list(complete.r), [0 for i in range(0, 16)], [0 for i in range(0, 16)]) 169 170 # Must be secure connection compatible to use these 171 elif oob_data_type == hci.OobDataPresent.P_256_PRESENT: 172 logging.info("Cert: Requesting P256 Extended Data; secure connections") 173 complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture() 174 self._enqueue_hci_command(hci.ReadLocalOobExtendedData(), True) 175 logging.info("Cert: Waiting for OOB response from controller") 176 assertThat(self._hci_event_stream).emits(complete_capture) 177 complete = complete_capture.get() 178 return ([0 for i in range(0, 16)], [0 for i in range(0, 16)], list(complete.c256), list(complete.r256)) 179 180 else: # Both 181 logging.info("Cert: Requesting P192 AND P256 Extended Data; secure connections") 182 complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture() 183 self._enqueue_hci_command(hci.ReadLocalOobExtendedData(), True) 184 logging.info("Cert: Waiting for OOB response from controller") 185 assertThat(self._hci_event_stream).emits(complete_capture) 186 complete = complete_capture.get() 187 return (list(complete.c192), list(complete.r192), list(complete.c256), list(complete.r256)) 188 189 def input_passkey(self, address, passkey): 190 """ 191 Pretend to answer the pairing dialog as a user 192 """ 193 logging.info("Cert: Waiting for PASSKEY request") 194 assertThat(self._hci_event_stream).emits(HciMatchers.EventWithCode(hci.EventCode.USER_PASSKEY_REQUEST)) 195 logging.info("Cert: Send user input passkey %d for %s" % (passkey, address)) 196 peer = bluetooth.Address(address) 197 self._enqueue_hci_command( 198 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.ENTRY_STARTED), 199 True) 200 self._enqueue_hci_command( 201 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 202 True) 203 self._enqueue_hci_command( 204 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 205 True) 206 self._enqueue_hci_command( 207 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.CLEARED), True) 208 self._enqueue_hci_command( 209 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 210 True) 211 self._enqueue_hci_command( 212 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 213 True) 214 self._enqueue_hci_command( 215 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ERASED), 216 True) 217 self._enqueue_hci_command( 218 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 219 True) 220 self._enqueue_hci_command( 221 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 222 True) 223 self._enqueue_hci_command( 224 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 225 True) 226 self._enqueue_hci_command( 227 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 228 True) 229 self._enqueue_hci_command( 230 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 231 True) 232 self._enqueue_hci_command( 233 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.ENTRY_COMPLETED), 234 True) 235 self._enqueue_hci_command(hci.UserPasskeyRequestReply(bd_addr=peer, numerical_value=passkey), True) 236 237 def input_pin(self, address, pin): 238 """ 239 Pretend to answer the pairing dialog as a user 240 """ 241 242 if len(pin) > self.MAX_PIN_LENGTH or len(pin) < self.MIN_PIN_LENGTH: 243 raise Exception("Pin code must be within range") 244 245 logging.info("Cert: Waiting for PIN request") 246 assertThat(self._hci_event_stream).emits(HciMatchers.PinCodeRequest()) 247 logging.info("Cert: Send user input PIN %s for %s" % (pin.decode(), address)) 248 peer = address.decode('utf-8') 249 pin_list = list(pin) 250 # Pad 251 for i in range(self.MAX_PIN_LENGTH - len(pin_list)): 252 pin_list.append(0) 253 self._enqueue_hci_command( 254 hci.PinCodeRequestReply(bd_addr=bluetooth.Address(peer), pin_code_length=len(pin), pin_code=pin_list), True) 255 256 def __send_ui_callback(self, address, callback_type, b, uid, pin): 257 """ 258 Pretend to answer the pairing dailog as a user 259 """ 260 logging.info("Cert: Send user input callback uid:%d; response: %s" % (uid, b)) 261 # TODO(optedoblivion): Make callback and set it to the module 262 263 def enable_secure_simple_pairing(self): 264 """ 265 This is called when you want to enable SSP for testing 266 """ 267 logging.info("Cert: Sending WRITE_SIMPLE_PAIRING_MODE [True]") 268 self._enqueue_hci_command(hci.WriteSimplePairingMode(simple_pairing_mode=hci.Enable.ENABLED), True) 269 logging.info("Cert: Waiting for controller response") 270 assertThat(self._hci_event_stream).emits(HciMatchers.CommandComplete(hci.OpCode.WRITE_SIMPLE_PAIRING_MODE)) 271 272 def enable_secure_connections(self): 273 """ 274 This is called when you want to enable secure connections support 275 """ 276 logging.info("Cert: Sending WRITE_SECURE_CONNECTIONS_HOST_SUPPORT [True]") 277 self._enqueue_hci_command( 278 hci.WriteSecureConnectionsHostSupport(secure_connections_host_support=hci.Enable.ENABLED), True) 279 logging.info("Cert: Waiting for controller response") 280 assertThat(self._hci_event_stream).emits( 281 HciMatchers.CommandComplete(hci.OpCode.WRITE_SECURE_CONNECTIONS_HOST_SUPPORT)) 282 # TODO(optedoblivion): Figure this out and remove (see classic_pairing_handler.cc) 283 #self._secure_connections_enabled = True 284 285 def send_io_caps(self, address): 286 logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST") 287 assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest()) 288 logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY") 289 oob_data_present = hci.OobDataPresent.NOT_PRESENT 290 self._enqueue_hci_command( 291 hci.IoCapabilityRequestReply(bd_addr=bluetooth.Address(address), 292 io_capability=self._io_caps, 293 oob_present=oob_data_present, 294 authentication_requirements=self._auth_reqs), True) 295 296 def accept_pairing(self, dut_address, reply_boolean, expect_to_fail, on_responder_reply): 297 """ 298 Here we handle the pairing events at the HCI level 299 """ 300 logging.info("Cert: Waiting for IO_CAPABILITY_RESPONSE") 301 assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityResponse()) 302 self.send_io_caps(dut_address) 303 logging.info("Cert: Waiting for USER_CONFIRMATION_REQUEST") 304 assertThat(self._hci_event_stream).emits(HciMatchers.UserConfirmationRequest()) 305 logging.info("Cert: Sending Simulated User Response '%s'" % reply_boolean) 306 if reply_boolean: 307 logging.info("Cert: Sending USER_CONFIRMATION_REQUEST_REPLY") 308 self._enqueue_hci_command(hci.UserConfirmationRequestReply(bd_addr=bluetooth.Address(dut_address)), True) 309 on_responder_reply() 310 logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE") 311 assertThat(self._hci_event_stream).emits(HciMatchers.SimplePairingComplete()) 312 if not expect_to_fail: 313 logging.info("Cert: Waiting for LINK_KEY_NOTIFICATION") 314 assertThat(self._hci_event_stream).emits(HciMatchers.LinkKeyNotification()) 315 else: 316 logging.info("Cert: Sending USER_CONFIRMATION_REQUEST_NEGATIVE_REPLY") 317 self._enqueue_hci_command(hci.UserConfirmationRequestNegativeReply(bd_addr=bluetooth.Address(dut_address)), 318 True) 319 on_responder_reply() 320 logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE") 321 assertThat(self._hci_event_stream).emits(HciMatchers.SimplePairingComplete()) 322 323 def accept_oob_pairing(self, dut_address): 324 logging.info("Cert: Waiting for IO_CAPABILITY_RESPONSE") 325 assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityResponse()) 326 self.send_io_caps(dut_address) 327 logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE") 328 ssp_complete_capture = HciCaptures.SimplePairingCompleteCapture() 329 assertThat(self._hci_event_stream).emits(ssp_complete_capture) 330 ssp_complete = ssp_complete_capture.get() 331 logging.info(ssp_complete.status) 332 assertThat(ssp_complete.status).isEqualTo(hci.ErrorCode.SUCCESS) 333 334 def on_user_input(self, dut_address, reply_boolean, expected_ui_event): 335 """ 336 Cert doesn't need the test to respond to the ui event 337 Cert responds in accept pairing 338 """ 339 pass 340 341 def wait_for_bond_event(self, expected_bond_event): 342 """ 343 A bond event will be triggered once the bond process 344 is complete. For the DUT we need to wait for it, 345 for Cert it isn't needed. 346 """ 347 pass 348 349 def enforce_security_policy(self, address, type, policy): 350 """ 351 Pass for now 352 """ 353 pass 354 355 def wait_for_enforce_security_event(self, expected_enforce_security_event): 356 """ 357 Cert side needs to pass 358 """ 359 pass 360 361 def wait_for_disconnect_event(self): 362 """ 363 Cert side needs to pass 364 """ 365 pass 366 # FIXME: Gabeldorsche facade don't allow us to register for an DISCONNECT_COMPLETE event 367 # logging.info("Cert: Waiting for DISCONNECT_COMPLETE") 368 # assertThat(self._hci_event_stream).emits(HciMatchers.DisconnectionComplete()) 369 370 def close(self): 371 safeClose(self._hci) 372