1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from datetime import timedelta 18import os 19import sys 20import logging 21 22from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass 23from cert.event_callback_stream import EventCallbackStream 24from cert.event_asserts import EventAsserts 25from google.protobuf import empty_pb2 as empty_proto 26from facade import common_pb2 as common 27from facade import rootservice_pb2 as facade_rootservice_pb2 28from hci.facade import facade_pb2 as hci_facade 29from hci.facade import acl_manager_facade_pb2 as acl_manager_facade 30from hci.facade import controller_facade_pb2 as controller_facade 31from l2cap.classic import facade_pb2 as l2cap_facade 32from neighbor.facade import facade_pb2 as neighbor_facade 33from security import facade_pb2 as security_facade 34from bluetooth_packets_python3 import hci_packets 35import bluetooth_packets_python3 as bt_packets 36 37 38class SimpleSecurityTest(GdFacadeOnlyBaseTestClass): 39 40 def setup_test(self): 41 self.device_under_test.rootservice.StartStack( 42 facade_rootservice_pb2.StartStackRequest( 43 module_under_test=facade_rootservice_pb2.BluetoothModule.Value( 44 'SECURITY'),)) 45 self.cert_device.rootservice.StartStack( 46 facade_rootservice_pb2.StartStackRequest( 47 module_under_test=facade_rootservice_pb2.BluetoothModule.Value( 48 'L2CAP'),)) 49 50 self.device_under_test.address = self.device_under_test.controller_read_only_property.ReadLocalAddress( 51 empty_proto.Empty()).address 52 self.cert_device.address = self.cert_device.controller_read_only_property.ReadLocalAddress( 53 empty_proto.Empty()).address 54 55 self.device_under_test.neighbor.EnablePageScan( 56 neighbor_facade.EnableMsg(enabled=True)) 57 self.cert_device.neighbor.EnablePageScan( 58 neighbor_facade.EnableMsg(enabled=True)) 59 60 self.dut_address = common.BluetoothAddress( 61 address=self.device_under_test.address) 62 self.cert_address = common.BluetoothAddress( 63 address=self.cert_device.address) 64 65 self.dut_address_with_type = common.BluetoothAddressWithType() 66 self.dut_address_with_type.address.CopyFrom(self.dut_address) 67 self.dut_address_with_type.type = common.BluetoothPeerAddressTypeEnum.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS 68 69 self.cert_address_with_type = common.BluetoothAddressWithType() 70 self.cert_address_with_type.address.CopyFrom(self.cert_address) 71 self.cert_address_with_type.type = common.BluetoothPeerAddressTypeEnum.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS 72 73 self.device_under_test.wait_channel_ready() 74 self.cert_device.wait_channel_ready() 75 76 self.cert_name = b'ImTheCert' 77 self.cert_device.hci_controller.WriteLocalName( 78 controller_facade.NameMsg(name=self.cert_name)) 79 self.dut_name = b'ImTheDUT' 80 self.device_under_test.hci_controller.WriteLocalName( 81 controller_facade.NameMsg(name=self.dut_name)) 82 83 def teardown_test(self): 84 self.device_under_test.rootservice.StopStack( 85 facade_rootservice_pb2.StopStackRequest()) 86 self.cert_device.rootservice.StopStack( 87 facade_rootservice_pb2.StopStackRequest()) 88 89 def tmp_register_for_event(self, event_code): 90 msg = hci_facade.EventCodeMsg(code=int(event_code)) 91 self.device_under_test.hci.RegisterEventHandler(msg) 92 93 def tmp_enqueue_hci_command(self, command, expect_complete): 94 cmd_bytes = bytes(command.Serialize()) 95 cmd = hci_facade.CommandMsg(command=cmd_bytes) 96 if (expect_complete): 97 self.device_under_test.hci.EnqueueCommandWithComplete(cmd) 98 else: 99 self.device_under_test.hci.EnqueueCommandWithStatus(cmd) 100 101 def register_for_event(self, event_code): 102 msg = hci_facade.EventCodeMsg(code=int(event_code)) 103 self.cert_device.hci.RegisterEventHandler(msg) 104 105 def enqueue_hci_command(self, command, expect_complete): 106 cmd_bytes = bytes(command.Serialize()) 107 cmd = hci_facade.CommandMsg(command=cmd_bytes) 108 if (expect_complete): 109 self.cert_device.hci.EnqueueCommandWithComplete(cmd) 110 else: 111 self.cert_device.hci.EnqueueCommandWithStatus(cmd) 112 113 def enqueue_acl_data(self, handle, pb_flag, b_flag, acl): 114 acl_msg = hci_facade.AclMsg( 115 handle=int(handle), 116 packet_boundary_flag=int(pb_flag), 117 broadcast_flag=int(b_flag), 118 data=acl) 119 self.cert_device.hci.SendAclData(acl_msg) 120 121 def pair_justworks(self, cert_iocap_reply, expected_ui_event): 122 # Cert event registration 123 self.register_for_event(hci_packets.EventCode.LINK_KEY_REQUEST) 124 self.register_for_event(hci_packets.EventCode.IO_CAPABILITY_REQUEST) 125 self.register_for_event(hci_packets.EventCode.IO_CAPABILITY_RESPONSE) 126 self.register_for_event(hci_packets.EventCode.USER_PASSKEY_NOTIFICATION) 127 self.register_for_event(hci_packets.EventCode.USER_CONFIRMATION_REQUEST) 128 self.register_for_event( 129 hci_packets.EventCode.REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION) 130 self.register_for_event(hci_packets.EventCode.LINK_KEY_NOTIFICATION) 131 self.register_for_event(hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE) 132 with EventCallbackStream(self.device_under_test.security.FetchUiEvents(empty_proto.Empty())) as dut_ui_stream, \ 133 EventCallbackStream(self.device_under_test.security.FetchBondEvents(empty_proto.Empty())) as dut_bond_stream, \ 134 EventCallbackStream(self.device_under_test.neighbor.GetRemoteNameEvents(empty_proto.Empty())) as name_event_stream, \ 135 EventCallbackStream(self.cert_device.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream: 136 137 cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) 138 dut_ui_event_asserts = EventAsserts(dut_ui_stream) 139 dut_bond_asserts = EventAsserts(dut_bond_stream) 140 dut_name_asserts = EventAsserts(name_event_stream) 141 142 dut_address = self.device_under_test.hci_controller.GetMacAddress( 143 empty_proto.Empty()).address 144 cert_address = self.cert_device.hci_controller.GetMacAddress( 145 empty_proto.Empty()).address 146 147 # Enable Simple Secure Pairing 148 self.enqueue_hci_command( 149 hci_packets.WriteSimplePairingModeBuilder( 150 hci_packets.Enable.ENABLED), True) 151 152 cert_hci_event_asserts.assert_event_occurs( 153 lambda msg: b'\x0e\x04\x01\x56\x0c' in msg.event) 154 155 # Get the name 156 self.device_under_test.neighbor.ReadRemoteName( 157 neighbor_facade.RemoteNameRequestMsg( 158 address=cert_address, 159 page_scan_repetition_mode=1, 160 clock_offset=0x6855)) 161 162 dut_name_asserts.assert_event_occurs( 163 lambda msg: self.cert_name in msg.name) 164 165 self.device_under_test.security.CreateBond( 166 common.BluetoothAddressWithType( 167 address=common.BluetoothAddress(address=cert_address), 168 type=common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)) 169 170 cert_hci_event_asserts.assert_event_occurs( 171 lambda event: logging.debug(event.event) or hci_packets.EventCode.IO_CAPABILITY_REQUEST in event.event 172 ) 173 174 self.enqueue_hci_command(cert_iocap_reply, True) 175 176 cert_hci_event_asserts.assert_event_occurs( 177 lambda event: logging.debug(event.event) or hci_packets.EventCode.USER_CONFIRMATION_REQUEST in event.event 178 ) 179 self.enqueue_hci_command( 180 hci_packets.UserConfirmationRequestReplyBuilder( 181 dut_address.decode('utf8')), True) 182 183 logging.info("Waiting for UI event") 184 ui_id = -1 185 186 def get_unique_id(event): 187 if (event.message_type == expected_ui_event): 188 nonlocal ui_id 189 ui_id = event.unique_id 190 return True 191 return False 192 193 dut_ui_event_asserts.assert_event_occurs(get_unique_id) 194 195 logging.info("Sending UI response") 196 self.device_under_test.security.SendUiCallback( 197 security_facade.UiCallbackMsg( 198 message_type=security_facade.UiCallbackType.YES_NO, 199 boolean=True, 200 unique_id=ui_id)) 201 202 dut_bond_asserts.assert_event_occurs( 203 lambda bond_event: bond_event.message_type == security_facade.BondMsgType.DEVICE_BONDED 204 ) 205 206 def test_display_only(self): 207 dut_address = self.device_under_test.hci_controller.GetMacAddress( 208 empty_proto.Empty()).address 209 self.pair_justworks( 210 hci_packets.IoCapabilityRequestReplyBuilder( 211 dut_address.decode('utf8'), 212 hci_packets.IoCapability.DISPLAY_ONLY, 213 hci_packets.OobDataPresent.NOT_PRESENT, hci_packets. 214 AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION), 215 security_facade.UiMsgType.DISPLAY_YES_NO_WITH_VALUE) 216 217 def test_no_input_no_output(self): 218 dut_address = self.device_under_test.hci_controller.GetMacAddress( 219 empty_proto.Empty()).address 220 self.pair_justworks( 221 hci_packets.IoCapabilityRequestReplyBuilder( 222 dut_address.decode('utf8'), 223 hci_packets.IoCapability.NO_INPUT_NO_OUTPUT, 224 hci_packets.OobDataPresent.NOT_PRESENT, hci_packets. 225 AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION), 226 security_facade.UiMsgType.DISPLAY_YES_NO) 227 228 def test_display_yes_no(self): 229 dut_address = self.device_under_test.hci_controller.GetMacAddress( 230 empty_proto.Empty()).address 231 self.pair_justworks( 232 hci_packets.IoCapabilityRequestReplyBuilder( 233 dut_address.decode('utf8'), 234 hci_packets.IoCapability.DISPLAY_YES_NO, 235 hci_packets.OobDataPresent.NOT_PRESENT, hci_packets. 236 AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION), 237 security_facade.UiMsgType.DISPLAY_YES_NO_WITH_VALUE) 238