1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import os 18import sys 19import logging 20 21from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass 22from cert.event_callback_stream import EventCallbackStream 23from cert.event_asserts import EventAsserts 24from google.protobuf import empty_pb2 as empty_proto 25from facade import rootservice_pb2 as facade_rootservice 26from hci.facade import acl_manager_facade_pb2 as acl_manager_facade 27from neighbor.facade import facade_pb2 as neighbor_facade 28from hci.facade import controller_facade_pb2 as controller_facade 29from hci.facade import facade_pb2 as hci_facade 30import bluetooth_packets_python3 as bt_packets 31from bluetooth_packets_python3 import hci_packets 32 33 34class AclManagerTest(GdFacadeOnlyBaseTestClass): 35 36 def setup_test(self): 37 self.device_under_test.rootservice.StartStack( 38 facade_rootservice.StartStackRequest( 39 module_under_test=facade_rootservice.BluetoothModule.Value( 40 'HCI_INTERFACES'),)) 41 self.cert_device.rootservice.StartStack( 42 facade_rootservice.StartStackRequest( 43 module_under_test=facade_rootservice.BluetoothModule.Value( 44 'HCI'),)) 45 46 self.device_under_test.wait_channel_ready() 47 self.cert_device.wait_channel_ready() 48 49 def teardown_test(self): 50 self.device_under_test.rootservice.StopStack( 51 facade_rootservice.StopStackRequest()) 52 self.cert_device.rootservice.StopStack( 53 facade_rootservice.StopStackRequest()) 54 55 def register_for_event(self, event_code): 56 msg = hci_facade.EventCodeMsg(code=int(event_code)) 57 self.cert_device.hci.RegisterEventHandler(msg) 58 59 def enqueue_hci_command(self, command, expect_complete): 60 cmd_bytes = bytes(command.Serialize()) 61 cmd = hci_facade.CommandMsg(command=cmd_bytes) 62 if (expect_complete): 63 self.cert_device.hci.EnqueueCommandWithComplete(cmd) 64 else: 65 self.cert_device.hci.EnqueueCommandWithStatus(cmd) 66 67 def enqueue_acl_data(self, handle, pb_flag, b_flag, acl): 68 acl_msg = hci_facade.AclMsg( 69 handle=int(handle), 70 packet_boundary_flag=int(pb_flag), 71 broadcast_flag=int(b_flag), 72 data=acl) 73 self.cert_device.hci.SendAclData(acl_msg) 74 75 def test_dut_connects(self): 76 self.register_for_event(hci_packets.EventCode.CONNECTION_REQUEST) 77 self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) 78 self.register_for_event( 79 hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) 80 with EventCallbackStream(self.cert_device.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \ 81 EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ 82 EventCallbackStream(self.device_under_test.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: 83 84 cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) 85 acl_data_asserts = EventAsserts(acl_data_stream) 86 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 87 88 # CERT Enables scans and gets its address 89 self.enqueue_hci_command( 90 hci_packets.WriteScanEnableBuilder( 91 hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True) 92 93 cert_address = None 94 95 def get_address_from_complete(packet): 96 packet_bytes = packet.event 97 if b'\x0e\x0a\x01\x09\x10' in packet_bytes: 98 nonlocal cert_address 99 addr_view = hci_packets.ReadBdAddrCompleteView( 100 hci_packets.CommandCompleteView( 101 hci_packets.EventPacketView( 102 bt_packets.PacketViewLittleEndian( 103 list(packet_bytes))))) 104 cert_address = addr_view.GetBdAddr() 105 return True 106 return False 107 108 self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True) 109 110 cert_hci_event_asserts.assert_event_occurs( 111 get_address_from_complete) 112 113 with EventCallbackStream( 114 self.device_under_test.hci_acl_manager.CreateConnection( 115 acl_manager_facade.ConnectionMsg( 116 address_type=int( 117 hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS), 118 address=bytes(cert_address, 119 'utf8')))) as connection_event_stream: 120 121 connection_event_asserts = EventAsserts(connection_event_stream) 122 connection_request = None 123 124 def get_connect_request(packet): 125 if b'\x04\x0a' in packet.event: 126 nonlocal connection_request 127 connection_request = hci_packets.ConnectionRequestView( 128 hci_packets.EventPacketView( 129 bt_packets.PacketViewLittleEndian( 130 list(packet.event)))) 131 return True 132 return False 133 134 # Cert Accepts 135 cert_hci_event_asserts.assert_event_occurs(get_connect_request) 136 self.enqueue_hci_command( 137 hci_packets.AcceptConnectionRequestBuilder( 138 connection_request.GetBdAddr(), 139 hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE), 140 False) 141 142 # Cert gets ConnectionComplete with a handle and sends ACL data 143 handle = 0xfff 144 145 def get_handle(packet): 146 packet_bytes = packet.event 147 if b'\x03\x0b\x00' in packet_bytes: 148 nonlocal handle 149 cc_view = hci_packets.ConnectionCompleteView( 150 hci_packets.EventPacketView( 151 bt_packets.PacketViewLittleEndian( 152 list(packet_bytes)))) 153 handle = cc_view.GetConnectionHandle() 154 return True 155 return False 156 157 cert_hci_event_asserts.assert_event_occurs(get_handle) 158 cert_handle = handle 159 160 self.enqueue_acl_data( 161 cert_handle, hci_packets.PacketBoundaryFlag. 162 FIRST_AUTOMATICALLY_FLUSHABLE, 163 hci_packets.BroadcastFlag.POINT_TO_POINT, 164 bytes( 165 b'\x26\x00\x07\x00This is just SomeAclData from the Cert' 166 )) 167 168 # DUT gets a connection complete event and sends and receives 169 handle = 0xfff 170 connection_event_asserts.assert_event_occurs(get_handle) 171 172 self.device_under_test.hci_acl_manager.SendAclData( 173 acl_manager_facade.AclData( 174 handle=handle, 175 payload=bytes( 176 b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT' 177 ))) 178 179 cert_acl_data_asserts.assert_event_occurs( 180 lambda packet: b'SomeMoreAclData' in packet.data) 181 acl_data_asserts.assert_event_occurs( 182 lambda packet: b'SomeAclData' in packet.payload) 183 184 def test_cert_connects(self): 185 self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) 186 self.register_for_event(hci_packets.EventCode.ROLE_CHANGE) 187 self.register_for_event( 188 hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) 189 with EventCallbackStream(self.cert_device.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \ 190 EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ 191 EventCallbackStream(self.device_under_test.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \ 192 EventCallbackStream(self.device_under_test.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: 193 194 cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) 195 incoming_connection_asserts = EventAsserts( 196 incoming_connection_stream) 197 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 198 acl_data_asserts = EventAsserts(acl_data_stream) 199 200 # DUT Enables scans and gets its address 201 dut_address = self.device_under_test.hci_controller.GetMacAddress( 202 empty_proto.Empty()).address 203 204 self.device_under_test.neighbor.EnablePageScan( 205 neighbor_facade.EnableMsg(enabled=True)) 206 207 # Cert connects 208 self.enqueue_hci_command( 209 hci_packets.CreateConnectionBuilder( 210 dut_address.decode('utf-8'), 211 0xcc18, # Packet Type 212 hci_packets.PageScanRepetitionMode.R1, 213 0x0, 214 hci_packets.ClockOffsetValid.INVALID, 215 hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH), 216 False) 217 218 conn_handle = 0xfff 219 220 def get_handle(packet): 221 packet_bytes = packet.event 222 if b'\x03\x0b\x00' in packet_bytes: 223 nonlocal conn_handle 224 cc_view = hci_packets.ConnectionCompleteView( 225 hci_packets.EventPacketView( 226 bt_packets.PacketViewLittleEndian( 227 list(packet_bytes)))) 228 conn_handle = cc_view.GetConnectionHandle() 229 return True 230 return False 231 232 # DUT gets a connection request 233 incoming_connection_asserts.assert_event_occurs(get_handle) 234 235 self.device_under_test.hci_acl_manager.SendAclData( 236 acl_manager_facade.AclData( 237 handle=conn_handle, 238 payload=bytes( 239 b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT' 240 ))) 241 242 conn_handle = 0xfff 243 244 cert_hci_event_asserts.assert_event_occurs(get_handle) 245 cert_handle = conn_handle 246 247 self.enqueue_acl_data( 248 cert_handle, 249 hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, 250 hci_packets.BroadcastFlag.POINT_TO_POINT, 251 bytes( 252 b'\x26\x00\x07\x00This is just SomeAclData from the Cert')) 253 254 cert_acl_data_asserts.assert_event_occurs( 255 lambda packet: b'SomeMoreAclData' in packet.data) 256 acl_data_asserts.assert_event_occurs( 257 lambda packet: b'SomeAclData' in packet.payload) 258 259 def test_recombination_l2cap_packet(self): 260 self.register_for_event(hci_packets.EventCode.CONNECTION_REQUEST) 261 self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) 262 self.register_for_event( 263 hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) 264 with EventCallbackStream(self.cert_device.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \ 265 EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ 266 EventCallbackStream(self.device_under_test.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: 267 268 cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) 269 acl_data_asserts = EventAsserts(acl_data_stream) 270 271 # CERT Enables scans and gets its address 272 self.enqueue_hci_command( 273 hci_packets.WriteScanEnableBuilder( 274 hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True) 275 276 cert_address = None 277 278 def get_address_from_complete(packet): 279 packet_bytes = packet.event 280 if b'\x0e\x0a\x01\x09\x10' in packet_bytes: 281 nonlocal cert_address 282 addr_view = hci_packets.ReadBdAddrCompleteView( 283 hci_packets.CommandCompleteView( 284 hci_packets.EventPacketView( 285 bt_packets.PacketViewLittleEndian( 286 list(packet_bytes))))) 287 cert_address = addr_view.GetBdAddr() 288 return True 289 return False 290 291 self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True) 292 293 cert_hci_event_asserts.assert_event_occurs( 294 get_address_from_complete) 295 296 with EventCallbackStream( 297 self.device_under_test.hci_acl_manager.CreateConnection( 298 acl_manager_facade.ConnectionMsg( 299 address_type=int( 300 hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS), 301 address=bytes(cert_address, 302 'utf8')))) as connection_event_stream: 303 304 connection_event_asserts = EventAsserts(connection_event_stream) 305 connection_request = None 306 307 def get_connect_request(packet): 308 if b'\x04\x0a' in packet.event: 309 nonlocal connection_request 310 connection_request = hci_packets.ConnectionRequestView( 311 hci_packets.EventPacketView( 312 bt_packets.PacketViewLittleEndian( 313 list(packet.event)))) 314 return True 315 return False 316 317 # Cert Accepts 318 cert_hci_event_asserts.assert_event_occurs(get_connect_request) 319 self.enqueue_hci_command( 320 hci_packets.AcceptConnectionRequestBuilder( 321 connection_request.GetBdAddr(), 322 hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE), 323 False) 324 325 # Cert gets ConnectionComplete with a handle and sends ACL data 326 handle = 0xfff 327 328 def get_handle(packet): 329 packet_bytes = packet.event 330 if b'\x03\x0b\x00' in packet_bytes: 331 nonlocal handle 332 cc_view = hci_packets.ConnectionCompleteView( 333 hci_packets.EventPacketView( 334 bt_packets.PacketViewLittleEndian( 335 list(packet_bytes)))) 336 handle = cc_view.GetConnectionHandle() 337 return True 338 return False 339 340 cert_hci_event_asserts.assert_event_occurs(get_handle) 341 cert_handle = handle 342 343 self.enqueue_acl_data( 344 cert_handle, hci_packets.PacketBoundaryFlag. 345 FIRST_AUTOMATICALLY_FLUSHABLE, 346 hci_packets.BroadcastFlag.POINT_TO_POINT, 347 bytes(b'\x06\x00\x07\x00Hello')) 348 self.enqueue_acl_data( 349 cert_handle, 350 hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, 351 hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!')) 352 self.enqueue_acl_data( 353 cert_handle, hci_packets.PacketBoundaryFlag. 354 FIRST_AUTOMATICALLY_FLUSHABLE, 355 hci_packets.BroadcastFlag.POINT_TO_POINT, 356 bytes(b'\xe8\x03\x07\x00' + b'Hello' * 200)) 357 358 # DUT gets a connection complete event and sends and receives 359 connection_event_asserts.assert_event_occurs(get_handle) 360 361 acl_data_asserts.assert_event_occurs( 362 lambda packet: b'Hello!' in packet.payload) 363 acl_data_asserts.assert_event_occurs( 364 lambda packet: b'Hello' * 200 in packet.payload) 365