1# 2# Copyright 2019 - The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import time 17from datetime import timedelta 18from mobly import asserts 19 20from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass 21from cert.event_asserts import EventAsserts 22from cert.event_callback_stream import EventCallbackStream 23from facade import common_pb2 24from facade import rootservice_pb2 as facade_rootservice 25from google.protobuf import empty_pb2 as empty_proto 26from l2cap.classic import facade_pb2 as l2cap_facade_pb2 27from neighbor.facade import facade_pb2 as neighbor_facade 28from hci.facade import acl_manager_facade_pb2 as acl_manager_facade 29import bluetooth_packets_python3 as bt_packets 30from bluetooth_packets_python3 import hci_packets, l2cap_packets 31 32# Assemble a sample packet. TODO: Use RawBuilder 33SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1) 34 35 36class L2capTest(GdFacadeOnlyBaseTestClass): 37 38 def setup_test(self): 39 self.device_under_test.rootservice.StartStack( 40 facade_rootservice.StartStackRequest( 41 module_under_test=facade_rootservice.BluetoothModule.Value( 42 'L2CAP'),)) 43 self.cert_device.rootservice.StartStack( 44 facade_rootservice.StartStackRequest( 45 module_under_test=facade_rootservice.BluetoothModule.Value( 46 'HCI_INTERFACES'),)) 47 48 self.device_under_test.wait_channel_ready() 49 self.cert_device.wait_channel_ready() 50 51 self.device_under_test.address = self.device_under_test.hci_controller.GetMacAddress( 52 empty_proto.Empty()).address 53 cert_address = self.cert_device.controller_read_only_property.ReadLocalAddress( 54 empty_proto.Empty()).address 55 self.cert_device.address = cert_address 56 self.dut_address = common_pb2.BluetoothAddress( 57 address=self.device_under_test.address) 58 self.cert_address = common_pb2.BluetoothAddress( 59 address=self.cert_device.address) 60 61 self.device_under_test.neighbor.EnablePageScan( 62 neighbor_facade.EnableMsg(enabled=True)) 63 64 self.cert_acl_handle = 0 65 66 self.on_connection_request = None 67 self.on_connection_response = None 68 self.on_configuration_request = None 69 self.on_configuration_response = None 70 self.on_disconnection_request = None 71 self.on_disconnection_response = None 72 self.on_information_request = None 73 self.on_information_response = None 74 75 self.scid_to_dcid = {} 76 self.ertm_tx_window_size = 10 77 self.ertm_max_transmit = 20 78 79 def _on_connection_request_default(self, l2cap_control_view): 80 connection_request_view = l2cap_packets.ConnectionRequestView( 81 l2cap_control_view) 82 sid = connection_request_view.GetIdentifier() 83 cid = connection_request_view.GetSourceCid() 84 85 self.scid_to_dcid[cid] = cid 86 87 connection_response = l2cap_packets.ConnectionResponseBuilder( 88 sid, cid, cid, l2cap_packets.ConnectionResponseResult.SUCCESS, 89 l2cap_packets.ConnectionResponseStatus. 90 NO_FURTHER_INFORMATION_AVAILABLE) 91 connection_response_l2cap = l2cap_packets.BasicFrameBuilder( 92 1, connection_response) 93 self.cert_device.hci_acl_manager.SendAclData( 94 acl_manager_facade.AclData( 95 handle=self.cert_acl_handle, 96 payload=bytes(connection_response_l2cap.Serialize()))) 97 return True 98 99 def _on_connection_response_default(self, l2cap_control_view): 100 connection_response_view = l2cap_packets.ConnectionResponseView( 101 l2cap_control_view) 102 sid = connection_response_view.GetIdentifier() 103 scid = connection_response_view.GetSourceCid() 104 dcid = connection_response_view.GetDestinationCid() 105 self.scid_to_dcid[scid] = dcid 106 107 config_request = l2cap_packets.ConfigurationRequestBuilder( 108 sid + 1, dcid, l2cap_packets.Continuation.END, []) 109 config_request_l2cap = l2cap_packets.BasicFrameBuilder( 110 1, config_request) 111 self.cert_device.hci_acl_manager.SendAclData( 112 acl_manager_facade.AclData( 113 handle=self.cert_acl_handle, 114 payload=bytes(config_request_l2cap.Serialize()))) 115 return True 116 117 def _on_connection_response_use_ertm(self, l2cap_control_view): 118 connection_response_view = l2cap_packets.ConnectionResponseView( 119 l2cap_control_view) 120 sid = connection_response_view.GetIdentifier() 121 scid = connection_response_view.GetSourceCid() 122 dcid = connection_response_view.GetDestinationCid() 123 self.scid_to_dcid[scid] = dcid 124 125 # FIXME: This doesn't work! 126 ertm_option = l2cap_packets.RetransmissionAndFlowControlConfigurationOption( 127 ) 128 ertm_option.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC 129 ertm_option.tx_window_size = self.ertm_tx_window_size 130 ertm_option.max_transmit = self.ertm_max_transmit 131 ertm_option.retransmission_time_out = 2000 132 ertm_option.monitor_time_out = 12000 133 ertm_option.maximum_pdu_size = 1010 134 135 options = [ertm_option] 136 137 config_request = l2cap_packets.ConfigurationRequestBuilder( 138 sid + 1, dcid, l2cap_packets.Continuation.END, options) 139 140 config_request_l2cap = l2cap_packets.BasicFrameBuilder( 141 1, config_request) 142 143 config_packet = bytearray([ 144 0x1a, 145 0x00, 146 0x01, 147 0x00, 148 0x04, 149 sid + 1, 150 0x16, 151 0x00, 152 dcid & 0xff, 153 dcid >> 8, 154 0x00, 155 0x00, 156 0x01, 157 0x02, 158 0xa0, 159 0x02, # MTU 160 0x04, 161 0x09, 162 0x03, 163 self.ertm_tx_window_size, 164 self.ertm_max_transmit, 165 0xd0, 166 0x07, 167 0xe0, 168 0x2e, 169 0xf2, 170 0x03, # ERTM 171 0x05, 172 0x01, 173 0x00 # FCS 174 ]) 175 176 self.cert_device.hci_acl_manager.SendAclData( 177 acl_manager_facade.AclData( 178 handle=self.cert_acl_handle, payload=bytes(config_packet))) 179 return True 180 181 def _on_connection_response_use_ertm_and_fcs(self, l2cap_control_view): 182 connection_response_view = l2cap_packets.ConnectionResponseView( 183 l2cap_control_view) 184 sid = connection_response_view.GetIdentifier() 185 scid = connection_response_view.GetSourceCid() 186 dcid = connection_response_view.GetDestinationCid() 187 self.scid_to_dcid[scid] = dcid 188 189 # FIXME: This doesn't work! 190 ertm_option = l2cap_packets.RetransmissionAndFlowControlConfigurationOption( 191 ) 192 ertm_option.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC 193 ertm_option.tx_window_size = self.ertm_tx_window_size 194 ertm_option.max_transmit = self.ertm_max_transmit 195 ertm_option.retransmission_time_out = 2000 196 ertm_option.monitor_time_out = 12000 197 ertm_option.maximum_pdu_size = 1010 198 199 options = [ertm_option] 200 201 config_request = l2cap_packets.ConfigurationRequestBuilder( 202 sid + 1, dcid, l2cap_packets.Continuation.END, options) 203 204 config_request_l2cap = l2cap_packets.BasicFrameBuilder( 205 1, config_request) 206 207 config_packet = bytearray([ 208 0x1a, 209 0x00, 210 0x01, 211 0x00, 212 0x04, 213 sid + 1, 214 0x16, 215 0x00, 216 dcid & 0xff, 217 dcid >> 8, 218 0x00, 219 0x00, 220 0x01, 221 0x02, 222 0xa0, 223 0x02, # MTU 224 0x04, 225 0x09, 226 0x03, 227 self.ertm_tx_window_size, 228 self.ertm_max_transmit, 229 0xd0, 230 0x07, 231 0xe0, 232 0x2e, 233 0xf2, 234 0x03, # ERTM 235 0x05, 236 0x01, 237 0x01 # FCS 238 ]) 239 240 self.cert_device.hci_acl_manager.SendAclData( 241 acl_manager_facade.AclData( 242 handle=self.cert_acl_handle, payload=bytes(config_packet))) 243 return True 244 245 def _on_configuration_request_default(self, l2cap_control_view): 246 configuration_request = l2cap_packets.ConfigurationRequestView( 247 l2cap_control_view) 248 sid = configuration_request.GetIdentifier() 249 dcid = configuration_request.GetDestinationCid() 250 config_response = l2cap_packets.ConfigurationResponseBuilder( 251 sid, self.scid_to_dcid.get(dcid, 0), l2cap_packets.Continuation.END, 252 l2cap_packets.ConfigurationResponseResult.SUCCESS, []) 253 config_response_l2cap = l2cap_packets.BasicFrameBuilder( 254 1, config_response) 255 self.cert_send_b_frame(config_response_l2cap) 256 257 def _on_configuration_response_default(self, l2cap_control_view): 258 configuration_response = l2cap_packets.ConfigurationResponseView( 259 l2cap_control_view) 260 sid = configuration_response.GetIdentifier() 261 262 def _on_disconnection_request_default(self, l2cap_control_view): 263 disconnection_request = l2cap_packets.DisconnectionRequestView( 264 l2cap_control_view) 265 sid = disconnection_request.GetIdentifier() 266 scid = disconnection_request.GetSourceCid() 267 dcid = disconnection_request.GetDestinationCid() 268 disconnection_response = l2cap_packets.DisconnectionResponseBuilder( 269 sid, dcid, scid) 270 disconnection_response_l2cap = l2cap_packets.BasicFrameBuilder( 271 1, disconnection_response) 272 self.cert_device.hci_acl_manager.SendAclData( 273 acl_manager_facade.AclData( 274 handle=self.cert_acl_handle, 275 payload=bytes(disconnection_response_l2cap.Serialize()))) 276 277 def _on_disconnection_response_default(self, l2cap_control_view): 278 disconnection_response = l2cap_packets.DisconnectionResponseView( 279 l2cap_control_view) 280 281 def _on_information_request_default(self, l2cap_control_view): 282 information_request = l2cap_packets.InformationRequestView( 283 l2cap_control_view) 284 sid = information_request.GetIdentifier() 285 information_type = information_request.GetInfoType() 286 if information_type == l2cap_packets.InformationRequestInfoType.CONNECTIONLESS_MTU: 287 response = l2cap_packets.InformationResponseConnectionlessMtuBuilder( 288 sid, l2cap_packets.InformationRequestResult.SUCCESS, 100) 289 response_l2cap = l2cap_packets.BasicFrameBuilder(1, response) 290 self.cert_device.hci_acl_manager.SendAclData( 291 acl_manager_facade.AclData( 292 handle=self.cert_acl_handle, 293 payload=bytes(response_l2cap.Serialize()))) 294 return 295 if information_type == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: 296 response = l2cap_packets.InformationResponseExtendedFeaturesBuilder( 297 sid, l2cap_packets.InformationRequestResult.SUCCESS, 0, 0, 0, 1, 298 0, 1, 0, 0, 0, 0) 299 response_l2cap = l2cap_packets.BasicFrameBuilder(1, response) 300 self.cert_device.hci_acl_manager.SendAclData( 301 acl_manager_facade.AclData( 302 handle=self.cert_acl_handle, 303 payload=bytes(response_l2cap.Serialize()))) 304 return 305 if information_type == l2cap_packets.InformationRequestInfoType.FIXED_CHANNELS_SUPPORTED: 306 response = l2cap_packets.InformationResponseFixedChannelsBuilder( 307 sid, l2cap_packets.InformationRequestResult.SUCCESS, 2) 308 response_l2cap = l2cap_packets.BasicFrameBuilder(1, response) 309 self.cert_device.hci_acl_manager.SendAclData( 310 acl_manager_facade.AclData( 311 handle=self.cert_acl_handle, 312 payload=bytes(response_l2cap.Serialize()))) 313 return 314 315 def _on_information_response_default(self, l2cap_control_view): 316 information_response = l2cap_packets.InformationResponseView( 317 l2cap_control_view) 318 319 def teardown_test(self): 320 self.device_under_test.rootservice.StopStack( 321 facade_rootservice.StopStackRequest()) 322 self.cert_device.rootservice.StopStack( 323 facade_rootservice.StopStackRequest()) 324 325 def _handle_control_packet(self, l2cap_packet): 326 packet_bytes = l2cap_packet.payload 327 l2cap_view = l2cap_packets.BasicFrameView( 328 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 329 if l2cap_view.GetChannelId() != 1: 330 return 331 l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload()) 332 if l2cap_control_view.GetCode( 333 ) == l2cap_packets.CommandCode.CONNECTION_REQUEST: 334 return self.on_connection_request( 335 l2cap_control_view 336 ) if self.on_connection_request else self._on_connection_request_default( 337 l2cap_control_view) 338 if l2cap_control_view.GetCode( 339 ) == l2cap_packets.CommandCode.CONNECTION_RESPONSE: 340 return self.on_connection_response( 341 l2cap_control_view 342 ) if self.on_connection_response else self._on_connection_response_default( 343 l2cap_control_view) 344 if l2cap_control_view.GetCode( 345 ) == l2cap_packets.CommandCode.CONFIGURATION_REQUEST: 346 return self.on_configuration_request( 347 l2cap_control_view 348 ) if self.on_configuration_request else self._on_configuration_request_default( 349 l2cap_control_view) 350 if l2cap_control_view.GetCode( 351 ) == l2cap_packets.CommandCode.CONFIGURATION_RESPONSE: 352 return self.on_configuration_response( 353 l2cap_control_view 354 ) if self.on_configuration_response else self._on_configuration_response_default( 355 l2cap_control_view) 356 if l2cap_control_view.GetCode( 357 ) == l2cap_packets.CommandCode.DISCONNECTION_REQUEST: 358 return self.on_disconnection_request( 359 l2cap_control_view 360 ) if self.on_disconnection_request else self._on_disconnection_request_default( 361 l2cap_control_view) 362 if l2cap_control_view.GetCode( 363 ) == l2cap_packets.CommandCode.DISCONNECTION_RESPONSE: 364 return self.on_disconnection_response( 365 l2cap_control_view 366 ) if self.on_disconnection_response else self._on_disconnection_response_default( 367 l2cap_control_view) 368 if l2cap_control_view.GetCode( 369 ) == l2cap_packets.CommandCode.INFORMATION_REQUEST: 370 return self.on_information_request( 371 l2cap_control_view 372 ) if self.on_information_request else self._on_information_request_default( 373 l2cap_control_view) 374 if l2cap_control_view.GetCode( 375 ) == l2cap_packets.CommandCode.INFORMATION_RESPONSE: 376 return self.on_information_response( 377 l2cap_control_view 378 ) if self.on_information_response else self._on_information_response_default( 379 l2cap_control_view) 380 return 381 382 def is_correct_connection_request(self, l2cap_packet): 383 packet_bytes = l2cap_packet.payload 384 l2cap_view = l2cap_packets.BasicFrameView( 385 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 386 if l2cap_view.GetChannelId() != 1: 387 return False 388 l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload()) 389 return l2cap_control_view.GetCode( 390 ) == l2cap_packets.CommandCode.CONNECTION_REQUEST 391 392 def is_correct_configuration_response(self, l2cap_packet): 393 packet_bytes = l2cap_packet.payload 394 l2cap_view = l2cap_packets.BasicFrameView( 395 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 396 if l2cap_view.GetChannelId() != 1: 397 return False 398 l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload()) 399 if l2cap_control_view.GetCode( 400 ) != l2cap_packets.CommandCode.CONFIGURATION_RESPONSE: 401 return False 402 configuration_response_view = l2cap_packets.ConfigurationResponseView( 403 l2cap_control_view) 404 return configuration_response_view.GetResult( 405 ) == l2cap_packets.ConfigurationResponseResult.SUCCESS 406 407 def is_correct_configuration_request(self, l2cap_packet): 408 packet_bytes = l2cap_packet.payload 409 l2cap_view = l2cap_packets.BasicFrameView( 410 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 411 if l2cap_view.GetChannelId() != 1: 412 return False 413 l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload()) 414 return l2cap_control_view.GetCode( 415 ) == l2cap_packets.CommandCode.CONFIGURATION_REQUEST 416 417 def is_correct_disconnection_request(self, l2cap_packet): 418 packet_bytes = l2cap_packet.payload 419 l2cap_view = l2cap_packets.BasicFrameView( 420 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 421 if l2cap_view.GetChannelId() != 1: 422 return False 423 l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload()) 424 return l2cap_control_view.GetCode( 425 ) == l2cap_packets.CommandCode.DISCONNECTION_REQUEST 426 427 def cert_send_b_frame(self, b_frame): 428 self.cert_device.hci_acl_manager.SendAclData( 429 acl_manager_facade.AclData( 430 handle=self.cert_acl_handle, 431 payload=bytes(b_frame.Serialize()))) 432 433 def get_req_seq_from_ertm_s_frame(self, scid, packet): 434 packet_bytes = packet.payload 435 l2cap_view = l2cap_packets.BasicFrameView( 436 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 437 if l2cap_view.GetChannelId() != scid: 438 return False 439 standard_view = l2cap_packets.StandardFrameView(l2cap_view) 440 if standard_view.GetFrameType() == l2cap_packets.FrameType.I_FRAME: 441 return False 442 s_frame = l2cap_packets.EnhancedSupervisoryFrameView(standard_view) 443 return s_frame.GetReqSeq() 444 445 def get_s_from_ertm_s_frame(self, scid, packet): 446 packet_bytes = packet.payload 447 l2cap_view = l2cap_packets.BasicFrameView( 448 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 449 if l2cap_view.GetChannelId() != scid: 450 return False 451 standard_view = l2cap_packets.StandardFrameView(l2cap_view) 452 if standard_view.GetFrameType() == l2cap_packets.FrameType.I_FRAME: 453 return False 454 s_frame = l2cap_packets.EnhancedSupervisoryFrameView(standard_view) 455 return s_frame.GetS() 456 457 def get_p_from_ertm_s_frame(self, scid, packet): 458 packet_bytes = packet.payload 459 l2cap_view = l2cap_packets.BasicFrameView( 460 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 461 if l2cap_view.GetChannelId() != scid: 462 return False 463 standard_view = l2cap_packets.StandardFrameView(l2cap_view) 464 if standard_view.GetFrameType() == l2cap_packets.FrameType.I_FRAME: 465 return False 466 s_frame = l2cap_packets.EnhancedSupervisoryFrameView(standard_view) 467 return s_frame.GetP() 468 469 def get_f_from_ertm_s_frame(self, scid, packet): 470 packet_bytes = packet.payload 471 l2cap_view = l2cap_packets.BasicFrameView( 472 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 473 if l2cap_view.GetChannelId() != scid: 474 return False 475 standard_view = l2cap_packets.StandardFrameView(l2cap_view) 476 if standard_view.GetFrameType() == l2cap_packets.FrameType.I_FRAME: 477 return False 478 s_frame = l2cap_packets.EnhancedSupervisoryFrameView(standard_view) 479 return s_frame.GetF() 480 481 def get_tx_seq_from_ertm_i_frame(self, scid, packet): 482 packet_bytes = packet.payload 483 l2cap_view = l2cap_packets.BasicFrameView( 484 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 485 if l2cap_view.GetChannelId() != scid: 486 return False 487 standard_view = l2cap_packets.StandardFrameView(l2cap_view) 488 if standard_view.GetFrameType() == l2cap_packets.FrameType.S_FRAME: 489 return False 490 i_frame = l2cap_packets.EnhancedInformationFrameView(standard_view) 491 return i_frame.GetTxSeq() 492 493 def get_payload_from_ertm_i_frame(self, scid, packet): 494 packet_bytes = packet.payload 495 l2cap_view = l2cap_packets.BasicFrameView( 496 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 497 if l2cap_view.GetChannelId() != scid: 498 return False 499 standard_view = l2cap_packets.StandardFrameView(l2cap_view) 500 if standard_view.GetFrameType() == l2cap_packets.FrameType.S_FRAME: 501 return False 502 i_frame = l2cap_packets.EnhancedInformationFrameView(standard_view) 503 return i_frame.GetPayload() # TODO(mylesgw): This doesn't work! 504 505 def _setup_link_from_cert(self): 506 507 self.device_under_test.neighbor.EnablePageScan( 508 neighbor_facade.EnableMsg(enabled=True)) 509 510 with EventCallbackStream( 511 self.cert_device.hci_acl_manager.CreateConnection( 512 acl_manager_facade.ConnectionMsg( 513 address_type=int( 514 hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS), 515 address=bytes(self.dut_address.address))) 516 ) as connection_event_stream: 517 518 connection_event_asserts = EventAsserts(connection_event_stream) 519 520 # Cert gets ConnectionComplete with a handle and sends ACL data 521 handle = 0xfff 522 523 def get_handle(packet): 524 packet_bytes = packet.event 525 if b'\x03\x0b\x00' in packet_bytes: 526 nonlocal handle 527 cc_view = hci_packets.ConnectionCompleteView( 528 hci_packets.EventPacketView( 529 bt_packets.PacketViewLittleEndian( 530 list(packet_bytes)))) 531 handle = cc_view.GetConnectionHandle() 532 return True 533 return False 534 535 connection_event_asserts.assert_event_occurs(get_handle) 536 537 self.cert_acl_handle = handle 538 return handle 539 540 def _open_channel( 541 self, 542 cert_acl_data_stream, 543 signal_id=1, 544 cert_acl_handle=0x1, 545 scid=0x0101, 546 psm=0x33, 547 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC): 548 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 549 550 self.device_under_test.l2cap.SetDynamicChannel( 551 l2cap_facade_pb2.SetEnableDynamicChannelRequest( 552 psm=psm, retransmission_mode=mode)) 553 open_channel = l2cap_packets.ConnectionRequestBuilder( 554 signal_id, psm, scid) 555 open_channel_l2cap = l2cap_packets.BasicFrameBuilder(1, open_channel) 556 self.cert_send_b_frame(open_channel_l2cap) 557 558 def verify_connection_response(packet): 559 packet_bytes = packet.payload 560 l2cap_view = l2cap_packets.BasicFrameView( 561 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 562 l2cap_control_view = l2cap_packets.ControlView( 563 l2cap_view.GetPayload()) 564 if l2cap_control_view.GetCode( 565 ) != l2cap_packets.CommandCode.CONNECTION_RESPONSE: 566 return False 567 connection_response_view = l2cap_packets.ConnectionResponseView( 568 l2cap_control_view) 569 return connection_response_view.GetSourceCid( 570 ) == scid and connection_response_view.GetResult( 571 ) == l2cap_packets.ConnectionResponseResult.SUCCESS and connection_response_view.GetDestinationCid( 572 ) != 0 573 574 cert_acl_data_asserts.assert_event_occurs(verify_connection_response) 575 576 def test_connect_dynamic_channel_and_send_data(self): 577 cert_acl_handle = self._setup_link_from_cert() 578 with EventCallbackStream( 579 self.cert_device.hci_acl_manager.FetchAclData( 580 empty_proto.Empty())) as cert_acl_data_stream: 581 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 582 cert_acl_data_stream.register_callback(self._handle_control_packet) 583 psm = 0x33 584 scid = 0x41 585 self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid, 586 psm) 587 self.device_under_test.l2cap.SendDynamicChannelPacket( 588 l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc')) 589 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 590 cert_acl_data_asserts.assert_event_occurs( 591 lambda packet: b'abc' in packet.payload) 592 593 def test_fixed_channel(self): 594 cert_acl_handle = self._setup_link_from_cert() 595 self.device_under_test.l2cap.RegisterChannel( 596 l2cap_facade_pb2.RegisterChannelRequest(channel=2)) 597 asserts.skip("FIXME: Not working") 598 with EventCallbackStream( 599 self.cert_device.hci_acl_manager.FetchAclData( 600 empty_proto.Empty())) as cert_acl_data_stream: 601 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 602 self.device_under_test.l2cap.SendL2capPacket( 603 l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123")) 604 cert_acl_data_asserts.assert_event_occurs( 605 lambda packet: b'123' in packet.payload) 606 607 def test_open_two_channels(self): 608 cert_acl_handle = self._setup_link_from_cert() 609 with EventCallbackStream( 610 self.cert_device.hci_acl_manager.FetchAclData( 611 empty_proto.Empty())) as cert_acl_data_stream: 612 cert_acl_data_stream.register_callback(self._handle_control_packet) 613 self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, 0x41, 614 0x41) 615 self._open_channel(cert_acl_data_stream, 2, cert_acl_handle, 0x43, 616 0x43) 617 618 def test_connect_and_send_data_ertm_no_segmentation(self): 619 cert_acl_handle = self._setup_link_from_cert() 620 with EventCallbackStream( 621 self.cert_device.hci_acl_manager.FetchAclData( 622 empty_proto.Empty())) as cert_acl_data_stream: 623 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 624 cert_acl_data_stream.register_callback(self._handle_control_packet) 625 self.on_connection_response = self._on_connection_response_use_ertm 626 627 psm = 0x33 628 scid = 0x41 629 self._open_channel( 630 cert_acl_data_stream, 631 1, 632 cert_acl_handle, 633 scid, 634 psm, 635 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 636 637 # FIXME: Order shouldn't matter here 638 cert_acl_data_asserts.assert_event_occurs( 639 self.is_correct_configuration_response) 640 cert_acl_data_asserts.assert_event_occurs( 641 self.is_correct_configuration_request) 642 643 dcid = self.scid_to_dcid[scid] 644 645 self.device_under_test.l2cap.SendDynamicChannelPacket( 646 l2cap_facade_pb2.DynamicChannelPacket( 647 psm=psm, payload=b'abc' * 34)) 648 cert_acl_data_asserts.assert_event_occurs( 649 lambda packet: b'abc' * 34 in packet.payload) 650 651 i_frame = l2cap_packets.EnhancedInformationFrameBuilder( 652 dcid, 0, l2cap_packets.Final.NOT_SET, 1, 653 l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, 654 SAMPLE_PACKET) 655 self.cert_send_b_frame(i_frame) 656 657 def test_basic_operation_request_connection(self): 658 """ 659 L2CAP/COS/CED/BV-01-C [Request Connection] 660 Verify that the IUT is able to request the connection establishment for an L2CAP data channel and 661 initiate the configuration procedure. 662 """ 663 cert_acl_handle = self._setup_link_from_cert() 664 665 with EventCallbackStream( 666 self.cert_device.hci_acl_manager.FetchAclData( 667 empty_proto.Empty())) as cert_acl_data_stream: 668 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 669 cert_acl_data_stream.register_callback(self._handle_control_packet) 670 psm = 0x33 671 # TODO: Use another test case 672 self.device_under_test.l2cap.OpenChannel( 673 l2cap_facade_pb2.OpenChannelRequest( 674 remote=self.cert_address, psm=psm)) 675 cert_acl_data_asserts.assert_event_occurs( 676 self.is_correct_connection_request) 677 678 def test_accept_disconnect(self): 679 """ 680 L2CAP/COS/CED/BV-07-C 681 """ 682 cert_acl_handle = self._setup_link_from_cert() 683 684 with EventCallbackStream( 685 self.cert_device.hci_acl_manager.FetchAclData( 686 empty_proto.Empty())) as cert_acl_data_stream: 687 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 688 cert_acl_data_stream.register_callback(self._handle_control_packet) 689 690 scid = 0x41 691 psm = 0x33 692 self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid, 693 psm) 694 695 dcid = self.scid_to_dcid[scid] 696 697 close_channel = l2cap_packets.DisconnectionRequestBuilder( 698 1, dcid, scid) 699 close_channel_l2cap = l2cap_packets.BasicFrameBuilder( 700 1, close_channel) 701 self.cert_send_b_frame(close_channel_l2cap) 702 703 def verify_disconnection_response(packet): 704 packet_bytes = packet.payload 705 l2cap_view = l2cap_packets.BasicFrameView( 706 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 707 l2cap_control_view = l2cap_packets.ControlView( 708 l2cap_view.GetPayload()) 709 if l2cap_control_view.GetCode( 710 ) != l2cap_packets.CommandCode.DISCONNECTION_RESPONSE: 711 return False 712 disconnection_response_view = l2cap_packets.DisconnectionResponseView( 713 l2cap_control_view) 714 return disconnection_response_view.GetSourceCid( 715 ) == scid and disconnection_response_view.GetDestinationCid( 716 ) == dcid 717 718 cert_acl_data_asserts.assert_event_occurs( 719 verify_disconnection_response) 720 721 def test_disconnect_on_timeout(self): 722 """ 723 L2CAP/COS/CED/BV-08-C 724 """ 725 cert_acl_handle = self._setup_link_from_cert() 726 727 with EventCallbackStream( 728 self.cert_device.hci_acl_manager.FetchAclData( 729 empty_proto.Empty())) as cert_acl_data_stream: 730 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 731 scid = 0x41 732 psm = 0x33 733 cert_acl_data_stream.register_callback(self._handle_control_packet) 734 735 # Don't send configuration request or response back 736 self.on_configuration_request = lambda _: True 737 self.on_connection_response = lambda _: True 738 739 self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid, 740 psm) 741 742 def is_configuration_response(l2cap_packet): 743 packet_bytes = l2cap_packet.payload 744 l2cap_view = l2cap_packets.BasicFrameView( 745 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 746 if l2cap_view.GetChannelId() != 1: 747 return False 748 l2cap_control_view = l2cap_packets.ControlView( 749 l2cap_view.GetPayload()) 750 return l2cap_control_view.GetCode( 751 ) == l2cap_packets.CommandCode.CONFIGURATION_RESPONSE 752 753 cert_acl_data_asserts.assert_none_matching( 754 is_configuration_response) 755 756 def test_respond_to_echo_request(self): 757 """ 758 L2CAP/COS/ECH/BV-01-C [Respond to Echo Request] 759 Verify that the IUT responds to an echo request. 760 """ 761 cert_acl_handle = self._setup_link_from_cert() 762 with EventCallbackStream( 763 self.cert_device.hci_acl_manager.FetchAclData( 764 empty_proto.Empty())) as cert_acl_data_stream: 765 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 766 cert_acl_data_stream.register_callback(self._handle_control_packet) 767 768 echo_request = l2cap_packets.EchoRequestBuilder( 769 100, l2cap_packets.DisconnectionRequestBuilder(1, 2, 3)) 770 echo_request_l2cap = l2cap_packets.BasicFrameBuilder( 771 1, echo_request) 772 self.cert_send_b_frame(echo_request_l2cap) 773 774 cert_acl_data_asserts.assert_event_occurs( 775 lambda packet: b"\x06\x01\x04\x00\x02\x00\x03\x00" in packet.payload 776 ) 777 778 def test_reject_unknown_command(self): 779 """ 780 L2CAP/COS/CED/BI-01-C 781 """ 782 cert_acl_handle = self._setup_link_from_cert() 783 with EventCallbackStream( 784 self.cert_device.hci_acl_manager.FetchAclData( 785 empty_proto.Empty())) as cert_acl_data_stream: 786 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 787 cert_acl_data_stream.register_callback(self._handle_control_packet) 788 789 invalid_command_packet = b"\x04\x00\x01\x00\xff\x01\x00\x00" 790 self.cert_device.hci_acl_manager.SendAclData( 791 acl_manager_facade.AclData( 792 handle=cert_acl_handle, 793 payload=bytes(invalid_command_packet))) 794 795 def is_command_reject(l2cap_packet): 796 packet_bytes = l2cap_packet.payload 797 l2cap_view = l2cap_packets.BasicFrameView( 798 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 799 if l2cap_view.GetChannelId() != 1: 800 return False 801 l2cap_control_view = l2cap_packets.ControlView( 802 l2cap_view.GetPayload()) 803 return l2cap_control_view.GetCode( 804 ) == l2cap_packets.CommandCode.COMMAND_REJECT 805 806 cert_acl_data_asserts.assert_event_occurs(is_command_reject) 807 808 def test_query_for_1_2_features(self): 809 """ 810 L2CAP/COS/IEX/BV-01-C [Query for 1.2 Features] 811 """ 812 cert_acl_handle = self._setup_link_from_cert() 813 with EventCallbackStream( 814 self.cert_device.hci_acl_manager.FetchAclData( 815 empty_proto.Empty())) as cert_acl_data_stream: 816 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 817 cert_acl_data_asserts_alt = EventAsserts(cert_acl_data_stream) 818 cert_acl_data_stream.register_callback(self._handle_control_packet) 819 signal_id = 3 820 information_request = l2cap_packets.InformationRequestBuilder( 821 signal_id, l2cap_packets.InformationRequestInfoType. 822 EXTENDED_FEATURES_SUPPORTED) 823 information_request_l2cap = l2cap_packets.BasicFrameBuilder( 824 1, information_request) 825 self.cert_send_b_frame(information_request_l2cap) 826 827 def is_correct_information_response(l2cap_packet): 828 packet_bytes = l2cap_packet.payload 829 l2cap_view = l2cap_packets.BasicFrameView( 830 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 831 if l2cap_view.GetChannelId() != 1: 832 return False 833 l2cap_control_view = l2cap_packets.ControlView( 834 l2cap_view.GetPayload()) 835 if l2cap_control_view.GetCode( 836 ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: 837 return False 838 information_response_view = l2cap_packets.InformationResponseView( 839 l2cap_control_view) 840 return information_response_view.GetInfoType( 841 ) == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED 842 843 cert_acl_data_asserts.assert_event_occurs( 844 is_correct_information_response) 845 846 def is_correct_information_request(l2cap_packet): 847 packet_bytes = l2cap_packet.payload 848 l2cap_view = l2cap_packets.BasicFrameView( 849 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 850 if l2cap_view.GetChannelId() != 1: 851 return False 852 l2cap_control_view = l2cap_packets.ControlView( 853 l2cap_view.GetPayload()) 854 if l2cap_control_view.GetCode( 855 ) != l2cap_packets.CommandCode.INFORMATION_REQUEST: 856 return False 857 information_request_view = l2cap_packets.InformationRequestView( 858 l2cap_control_view) 859 return information_request_view.GetInfoType( 860 ) == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED 861 862 cert_acl_data_asserts_alt.assert_event_occurs( 863 is_correct_information_request) 864 865 def test_extended_feature_info_response_ertm(self): 866 """ 867 L2CAP/EXF/BV-01-C [Extended Features Information Response for Enhanced 868 Retransmission Mode] 869 """ 870 cert_acl_handle = self._setup_link_from_cert() 871 with EventCallbackStream( 872 self.cert_device.hci_acl_manager.FetchAclData( 873 empty_proto.Empty())) as cert_acl_data_stream: 874 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 875 cert_acl_data_stream.register_callback(self._handle_control_packet) 876 877 signal_id = 3 878 information_request = l2cap_packets.InformationRequestBuilder( 879 signal_id, l2cap_packets.InformationRequestInfoType. 880 EXTENDED_FEATURES_SUPPORTED) 881 information_request_l2cap = l2cap_packets.BasicFrameBuilder( 882 1, information_request) 883 self.cert_send_b_frame(information_request_l2cap) 884 885 def is_correct_information_response(l2cap_packet): 886 packet_bytes = l2cap_packet.payload 887 l2cap_view = l2cap_packets.BasicFrameView( 888 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 889 if l2cap_view.GetChannelId() != 1: 890 return False 891 l2cap_control_view = l2cap_packets.ControlView( 892 l2cap_view.GetPayload()) 893 if l2cap_control_view.GetCode( 894 ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: 895 return False 896 information_response_view = l2cap_packets.InformationResponseView( 897 l2cap_control_view) 898 if information_response_view.GetInfoType( 899 ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: 900 return False 901 extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView( 902 information_response_view) 903 return extended_features_view.GetEnhancedRetransmissionMode() 904 905 cert_acl_data_asserts.assert_event_occurs( 906 is_correct_information_response) 907 908 def test_extended_feature_info_response_fcs(self): 909 """ 910 L2CAP/EXF/BV-03-C [Extended Features Information Response for FCS Option] 911 Note: This is not mandated by L2CAP Spec 912 """ 913 cert_acl_handle = self._setup_link_from_cert() 914 with EventCallbackStream( 915 self.cert_device.hci_acl_manager.FetchAclData( 916 empty_proto.Empty())) as cert_acl_data_stream: 917 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 918 cert_acl_data_stream.register_callback(self._handle_control_packet) 919 920 signal_id = 3 921 information_request = l2cap_packets.InformationRequestBuilder( 922 signal_id, l2cap_packets.InformationRequestInfoType. 923 EXTENDED_FEATURES_SUPPORTED) 924 information_request_l2cap = l2cap_packets.BasicFrameBuilder( 925 1, information_request) 926 self.cert_send_b_frame(information_request_l2cap) 927 928 def is_correct_information_response(l2cap_packet): 929 packet_bytes = l2cap_packet.payload 930 l2cap_view = l2cap_packets.BasicFrameView( 931 bt_packets.PacketViewLittleEndian(list(packet_bytes))) 932 if l2cap_view.GetChannelId() != 1: 933 return False 934 l2cap_control_view = l2cap_packets.ControlView( 935 l2cap_view.GetPayload()) 936 if l2cap_control_view.GetCode( 937 ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE: 938 return False 939 information_response_view = l2cap_packets.InformationResponseView( 940 l2cap_control_view) 941 if information_response_view.GetInfoType( 942 ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: 943 return False 944 extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView( 945 information_response_view) 946 return extended_features_view.GetFcsOption() 947 948 cert_acl_data_asserts.assert_event_occurs( 949 is_correct_information_response) 950 951 def test_config_channel_not_use_FCS(self): 952 """ 953 L2CAP/FOC/BV-01-C [IUT Initiated Configuration of the FCS Option] 954 Verify the IUT can configure a channel to not use FCS in I/S-frames. 955 """ 956 cert_acl_handle = self._setup_link_from_cert() 957 with EventCallbackStream( 958 self.cert_device.hci_acl_manager.FetchAclData( 959 empty_proto.Empty())) as cert_acl_data_stream: 960 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 961 cert_acl_data_stream.register_callback(self._handle_control_packet) 962 963 self.on_connection_response = self._on_connection_response_use_ertm 964 965 psm = 0x33 966 scid = 0x41 967 self._open_channel( 968 cert_acl_data_stream, 969 1, 970 cert_acl_handle, 971 scid, 972 psm, 973 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 974 # FIXME: Order shouldn't matter here 975 cert_acl_data_asserts.assert_event_occurs( 976 self.is_correct_configuration_response) 977 cert_acl_data_asserts.assert_event_occurs( 978 self.is_correct_configuration_request) 979 980 self.device_under_test.l2cap.SendDynamicChannelPacket( 981 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 982 cert_acl_data_asserts.assert_event_occurs( 983 lambda packet: b"abc" in packet.payload) 984 985 def test_explicitly_request_use_FCS(self): 986 """ 987 L2CAP/FOC/BV-02-C [Lower Tester Explicitly Requests FCS should be Used] 988 Verify the IUT will include the FCS in I/S-frames if the Lower Tester explicitly requests that FCS 989 should be used. 990 """ 991 992 cert_acl_handle = self._setup_link_from_cert() 993 with EventCallbackStream( 994 self.cert_device.hci_acl_manager.FetchAclData( 995 empty_proto.Empty())) as cert_acl_data_stream: 996 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 997 cert_acl_data_stream.register_callback(self._handle_control_packet) 998 999 self.on_connection_response = self._on_connection_response_use_ertm_and_fcs 1000 psm = 0x33 1001 scid = 0x41 1002 self._open_channel( 1003 cert_acl_data_stream, 1004 1, 1005 cert_acl_handle, 1006 scid, 1007 psm, 1008 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1009 # FIXME: Order shouldn't matter here 1010 cert_acl_data_asserts.assert_event_occurs( 1011 self.is_correct_configuration_response) 1012 cert_acl_data_asserts.assert_event_occurs( 1013 self.is_correct_configuration_request) 1014 1015 self.device_under_test.l2cap.SendDynamicChannelPacket( 1016 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1017 cert_acl_data_asserts.assert_event_occurs( 1018 lambda packet: b"abc\x4f\xa3" in packet.payload 1019 ) # TODO: Use packet parser 1020 1021 def test_implicitly_request_use_FCS(self): 1022 """ 1023 L2CAP/FOC/BV-03-C [Lower Tester Implicitly Requests FCS should be Used] 1024 TODO: Update this test case. What's the difference between this one and test_explicitly_request_use_FCS? 1025 """ 1026 cert_acl_handle = self._setup_link_from_cert() 1027 with EventCallbackStream( 1028 self.cert_device.hci_acl_manager.FetchAclData( 1029 empty_proto.Empty())) as cert_acl_data_stream: 1030 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1031 cert_acl_data_stream.register_callback(self._handle_control_packet) 1032 1033 self.on_connection_response = self._on_connection_response_use_ertm_and_fcs 1034 psm = 0x33 1035 scid = 0x41 1036 self._open_channel( 1037 cert_acl_data_stream, 1038 1, 1039 cert_acl_handle, 1040 scid, 1041 psm, 1042 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1043 # FIXME: Order shouldn't matter here 1044 cert_acl_data_asserts.assert_event_occurs( 1045 self.is_correct_configuration_response) 1046 cert_acl_data_asserts.assert_event_occurs( 1047 self.is_correct_configuration_request) 1048 1049 self.device_under_test.l2cap.SendDynamicChannelPacket( 1050 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1051 cert_acl_data_asserts.assert_event_occurs( 1052 lambda packet: b"abc\x4f\xa3" in packet.payload 1053 ) # TODO: Use packet parser 1054 1055 def test_transmit_i_frames(self): 1056 """ 1057 L2CAP/ERM/BV-01-C [Transmit I-frames] 1058 """ 1059 cert_acl_handle = self._setup_link_from_cert() 1060 with EventCallbackStream( 1061 self.cert_device.hci_acl_manager.FetchAclData( 1062 empty_proto.Empty())) as cert_acl_data_stream: 1063 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1064 cert_acl_data_stream.register_callback(self._handle_control_packet) 1065 1066 self.on_connection_response = self._on_connection_response_use_ertm 1067 psm = 0x33 1068 scid = 0x41 1069 self._open_channel( 1070 cert_acl_data_stream, 1071 1, 1072 cert_acl_handle, 1073 scid, 1074 psm, 1075 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1076 1077 dcid = self.scid_to_dcid[scid] 1078 1079 # FIXME: Order shouldn't matter here 1080 cert_acl_data_asserts.assert_event_occurs( 1081 self.is_correct_configuration_response) 1082 cert_acl_data_asserts.assert_event_occurs( 1083 self.is_correct_configuration_request) 1084 1085 self.device_under_test.l2cap.SendDynamicChannelPacket( 1086 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1087 cert_acl_data_asserts.assert_event_occurs( 1088 lambda packet: b"abc" in packet.payload) 1089 1090 # Assemble a sample packet. TODO: Use RawBuilder 1091 SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1) 1092 1093 i_frame = l2cap_packets.EnhancedInformationFrameBuilder( 1094 dcid, 0, l2cap_packets.Final.NOT_SET, 1, 1095 l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, 1096 SAMPLE_PACKET) 1097 self.cert_send_b_frame(i_frame) 1098 1099 self.device_under_test.l2cap.SendDynamicChannelPacket( 1100 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1101 cert_acl_data_asserts.assert_event_occurs( 1102 lambda packet: b"abc" in packet.payload) 1103 1104 i_frame = l2cap_packets.EnhancedInformationFrameBuilder( 1105 dcid, 1, l2cap_packets.Final.NOT_SET, 2, 1106 l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, 1107 SAMPLE_PACKET) 1108 self.cert_send_b_frame(i_frame) 1109 1110 self.device_under_test.l2cap.SendDynamicChannelPacket( 1111 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1112 cert_acl_data_asserts.assert_event_occurs( 1113 lambda packet: b"abc" in packet.payload) 1114 1115 i_frame = l2cap_packets.EnhancedInformationFrameBuilder( 1116 dcid, 2, l2cap_packets.Final.NOT_SET, 3, 1117 l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, 1118 SAMPLE_PACKET) 1119 self.cert_send_b_frame(i_frame) 1120 1121 def test_receive_i_frames(self): 1122 """ 1123 L2CAP/ERM/BV-02-C [Receive I-Frames] 1124 Verify the IUT can receive in-sequence valid I-frames and deliver L2CAP SDUs to the Upper Tester 1125 """ 1126 cert_acl_handle = self._setup_link_from_cert() 1127 with EventCallbackStream( 1128 self.cert_device.hci_acl_manager.FetchAclData( 1129 empty_proto.Empty())) as cert_acl_data_stream: 1130 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1131 cert_acl_data_stream.register_callback(self._handle_control_packet) 1132 self.on_connection_response = self._on_connection_response_use_ertm 1133 1134 psm = 0x33 1135 scid = 0x41 1136 self._open_channel( 1137 cert_acl_data_stream, 1138 1, 1139 cert_acl_handle, 1140 scid, 1141 psm, 1142 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1143 1144 dcid = self.scid_to_dcid[scid] 1145 1146 # FIXME: Order shouldn't matter here 1147 cert_acl_data_asserts.assert_event_occurs( 1148 self.is_correct_configuration_response) 1149 cert_acl_data_asserts.assert_event_occurs( 1150 self.is_correct_configuration_request) 1151 1152 for i in range(3): 1153 i_frame = l2cap_packets.EnhancedInformationFrameBuilder( 1154 dcid, i, l2cap_packets.Final.NOT_SET, 0, 1155 l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, 1156 SAMPLE_PACKET) 1157 self.cert_send_b_frame(i_frame) 1158 cert_acl_data_asserts.assert_event_occurs( 1159 lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1 1160 ) 1161 1162 i_frame = l2cap_packets.EnhancedInformationFrameBuilder( 1163 dcid, 3, l2cap_packets.Final.NOT_SET, 0, 1164 l2cap_packets.SegmentationAndReassembly.START, SAMPLE_PACKET) 1165 self.cert_send_b_frame(i_frame) 1166 cert_acl_data_asserts.assert_event_occurs( 1167 lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4 1168 ) 1169 1170 i_frame = l2cap_packets.EnhancedInformationFrameBuilder( 1171 dcid, 4, l2cap_packets.Final.NOT_SET, 0, 1172 l2cap_packets.SegmentationAndReassembly.CONTINUATION, 1173 SAMPLE_PACKET) 1174 self.cert_send_b_frame(i_frame) 1175 cert_acl_data_asserts.assert_event_occurs( 1176 lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 5 1177 ) 1178 1179 i_frame = l2cap_packets.EnhancedInformationFrameBuilder( 1180 dcid, 5, l2cap_packets.Final.NOT_SET, 0, 1181 l2cap_packets.SegmentationAndReassembly.END, SAMPLE_PACKET) 1182 self.cert_send_b_frame(i_frame) 1183 cert_acl_data_asserts.assert_event_occurs( 1184 lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 6 1185 ) 1186 1187 def test_acknowledging_received_i_frames(self): 1188 """ 1189 L2CAP/ERM/BV-03-C [Acknowledging Received I-Frames] 1190 Verify the IUT sends S-frame [RR] with the Poll bit not set to acknowledge data received from the 1191 Lower Tester 1192 """ 1193 cert_acl_handle = self._setup_link_from_cert() 1194 with EventCallbackStream( 1195 self.cert_device.hci_acl_manager.FetchAclData( 1196 empty_proto.Empty())) as cert_acl_data_stream: 1197 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1198 cert_acl_data_stream.register_callback(self._handle_control_packet) 1199 self.on_connection_response = self._on_connection_response_use_ertm 1200 1201 psm = 0x33 1202 scid = 0x41 1203 self._open_channel( 1204 cert_acl_data_stream, 1205 1, 1206 cert_acl_handle, 1207 scid, 1208 psm, 1209 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1210 1211 dcid = self.scid_to_dcid[scid] 1212 1213 # FIXME: Order shouldn't matter here 1214 cert_acl_data_asserts.assert_event_occurs( 1215 self.is_correct_configuration_response) 1216 cert_acl_data_asserts.assert_event_occurs( 1217 self.is_correct_configuration_request) 1218 1219 for i in range(3): 1220 i_frame = l2cap_packets.EnhancedInformationFrameBuilder( 1221 dcid, i, l2cap_packets.Final.NOT_SET, 0, 1222 l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, 1223 SAMPLE_PACKET) 1224 self.cert_send_b_frame(i_frame) 1225 cert_acl_data_asserts.assert_event_occurs( 1226 lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1 1227 ) 1228 1229 cert_acl_data_asserts.assert_none_matching( 1230 lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4, 1231 timedelta(seconds=1)) 1232 1233 def test_resume_transmitting_when_received_rr(self): 1234 """ 1235 L2CAP/ERM/BV-05-C [Resume Transmitting I-Frames when an S-Frame [RR] is Received] 1236 Verify the IUT will cease transmission of I-frames when the negotiated TxWindow is full. Verify the 1237 IUT will resume transmission of I-frames when an S-frame [RR] is received that acknowledges 1238 previously sent I-frames. 1239 """ 1240 self.ertm_tx_window_size = 1 1241 cert_acl_handle = self._setup_link_from_cert() 1242 with EventCallbackStream( 1243 self.cert_device.hci_acl_manager.FetchAclData( 1244 empty_proto.Empty())) as cert_acl_data_stream: 1245 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1246 cert_acl_data_stream.register_callback(self._handle_control_packet) 1247 self.on_connection_response = self._on_connection_response_use_ertm 1248 1249 psm = 0x33 1250 scid = 0x41 1251 self._open_channel( 1252 cert_acl_data_stream, 1253 1, 1254 cert_acl_handle, 1255 scid, 1256 psm, 1257 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1258 1259 dcid = self.scid_to_dcid[scid] 1260 1261 # FIXME: Order shouldn't matter here 1262 cert_acl_data_asserts.assert_event_occurs( 1263 self.is_correct_configuration_response) 1264 cert_acl_data_asserts.assert_event_occurs( 1265 self.is_correct_configuration_request) 1266 1267 self.device_under_test.l2cap.SendDynamicChannelPacket( 1268 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1269 self.device_under_test.l2cap.SendDynamicChannelPacket( 1270 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'def')) 1271 1272 # TODO: Besides checking TxSeq, we also want to check payload, once we can get it from packet view 1273 cert_acl_data_asserts.assert_event_occurs( 1274 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0 1275 ) 1276 cert_acl_data_asserts.assert_none_matching( 1277 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1, 1278 ) 1279 s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder( 1280 dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY, 1281 l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 1282 1) 1283 self.cert_send_b_frame(s_frame) 1284 cert_acl_data_asserts.assert_event_occurs( 1285 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1 1286 ) 1287 1288 def test_resume_transmitting_when_acknowledge_previously_sent(self): 1289 """ 1290 L2CAP/ERM/BV-06-C [Resume Transmitting I-Frames when an I-Frame is Received] 1291 Verify the IUT will cease transmission of I-frames when the negotiated TxWindow is full. Verify the 1292 IUT will resume transmission of I-frames when an I-frame is received that acknowledges previously 1293 sent I-frames. 1294 """ 1295 self.ertm_tx_window_size = 1 1296 cert_acl_handle = self._setup_link_from_cert() 1297 with EventCallbackStream( 1298 self.cert_device.hci_acl_manager.FetchAclData( 1299 empty_proto.Empty())) as cert_acl_data_stream: 1300 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1301 cert_acl_data_stream.register_callback(self._handle_control_packet) 1302 self.on_connection_response = self._on_connection_response_use_ertm 1303 1304 psm = 0x33 1305 scid = 0x41 1306 self._open_channel( 1307 cert_acl_data_stream, 1308 1, 1309 cert_acl_handle, 1310 scid, 1311 psm, 1312 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1313 1314 dcid = self.scid_to_dcid[scid] 1315 1316 # FIXME: Order shouldn't matter here 1317 cert_acl_data_asserts.assert_event_occurs( 1318 self.is_correct_configuration_response) 1319 cert_acl_data_asserts.assert_event_occurs( 1320 self.is_correct_configuration_request) 1321 1322 self.device_under_test.l2cap.SendDynamicChannelPacket( 1323 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1324 self.device_under_test.l2cap.SendDynamicChannelPacket( 1325 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'def')) 1326 1327 cert_acl_data_asserts.assert_event_occurs( 1328 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0 1329 ) 1330 # TODO: If 1 second is greater than their retransmit timeout, use a smaller timeout 1331 cert_acl_data_asserts.assert_none_matching( 1332 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1, 1333 timedelta(seconds=1)) 1334 1335 i_frame = l2cap_packets.EnhancedInformationFrameBuilder( 1336 dcid, 0, l2cap_packets.Final.NOT_SET, 1, 1337 l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, 1338 SAMPLE_PACKET) 1339 self.cert_send_b_frame(i_frame) 1340 1341 cert_acl_data_asserts.assert_event_occurs( 1342 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1 1343 ) 1344 1345 i_frame = l2cap_packets.EnhancedInformationFrameBuilder( 1346 dcid, 1, l2cap_packets.Final.NOT_SET, 2, 1347 l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, 1348 SAMPLE_PACKET) 1349 self.cert_send_b_frame(i_frame) 1350 1351 def test_transmit_s_frame_rr_with_poll_bit_set(self): 1352 """ 1353 L2CAP/ERM/BV-08-C [Send S-Frame [RR] with Poll Bit Set] 1354 Verify the IUT sends an S-frame [RR] with the Poll bit set when its retransmission timer expires. 1355 """ 1356 cert_acl_handle = self._setup_link_from_cert() 1357 with EventCallbackStream( 1358 self.cert_device.hci_acl_manager.FetchAclData( 1359 empty_proto.Empty())) as cert_acl_data_stream: 1360 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1361 cert_acl_data_stream.register_callback(self._handle_control_packet) 1362 self.on_connection_response = self._on_connection_response_use_ertm 1363 1364 psm = 0x33 1365 scid = 0x41 1366 self._open_channel( 1367 cert_acl_data_stream, 1368 1, 1369 cert_acl_handle, 1370 scid, 1371 psm, 1372 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1373 1374 # FIXME: Order shouldn't matter here 1375 cert_acl_data_asserts.assert_event_occurs( 1376 self.is_correct_configuration_response) 1377 cert_acl_data_asserts.assert_event_occurs( 1378 self.is_correct_configuration_request) 1379 1380 self.device_under_test.l2cap.SendDynamicChannelPacket( 1381 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1382 # TODO: Always use their retransmission timeout value 1383 time.sleep(2) 1384 cert_acl_data_asserts.assert_event_occurs( 1385 lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL 1386 ) 1387 1388 def test_transmit_s_frame_rr_with_final_bit_set(self): 1389 """ 1390 L2CAP/ERM/BV-09-C [Send S-Frame [RR] with Final Bit Set] 1391 Verify the IUT responds with an S-frame [RR] with the Final bit set after receiving an S-frame [RR] 1392 with the Poll bit set. 1393 """ 1394 cert_acl_handle = self._setup_link_from_cert() 1395 with EventCallbackStream( 1396 self.cert_device.hci_acl_manager.FetchAclData( 1397 empty_proto.Empty())) as cert_acl_data_stream: 1398 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1399 cert_acl_data_stream.register_callback(self._handle_control_packet) 1400 self.on_connection_response = self._on_connection_response_use_ertm 1401 1402 psm = 0x33 1403 scid = 0x41 1404 self._open_channel( 1405 cert_acl_data_stream, 1406 1, 1407 cert_acl_handle, 1408 scid, 1409 psm, 1410 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1411 1412 # FIXME: Order shouldn't matter here 1413 cert_acl_data_asserts.assert_event_occurs( 1414 self.is_correct_configuration_response) 1415 cert_acl_data_asserts.assert_event_occurs( 1416 self.is_correct_configuration_request) 1417 1418 dcid = self.scid_to_dcid[scid] 1419 1420 s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder( 1421 dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY, 1422 l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0) 1423 self.cert_send_b_frame(s_frame) 1424 1425 cert_acl_data_asserts.assert_event_occurs( 1426 lambda packet: self.get_f_from_ertm_s_frame(scid, packet) == l2cap_packets.Final.POLL_RESPONSE 1427 ) 1428 1429 def test_s_frame_transmissions_exceed_max_transmit(self): 1430 """ 1431 L2CAP/ERM/BV-11-C [S-Frame Transmissions Exceed MaxTransmit] 1432 Verify the IUT will close the channel when the Monitor Timer expires. 1433 """ 1434 asserts.skip("Need to configure DUT to have a shorter timer") 1435 cert_acl_handle = self._setup_link_from_cert() 1436 with EventCallbackStream( 1437 self.cert_device.hci_acl_manager.FetchAclData( 1438 empty_proto.Empty())) as cert_acl_data_stream: 1439 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1440 cert_acl_data_stream.register_callback(self._handle_control_packet) 1441 self.on_connection_response = self._on_connection_response_use_ertm 1442 1443 psm = 0x33 1444 scid = 0x41 1445 self._open_channel( 1446 cert_acl_data_stream, 1447 1, 1448 cert_acl_handle, 1449 scid, 1450 psm, 1451 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1452 1453 # FIXME: Order shouldn't matter here 1454 cert_acl_data_asserts.assert_event_occurs( 1455 self.is_correct_configuration_response) 1456 cert_acl_data_asserts.assert_event_occurs( 1457 self.is_correct_configuration_request) 1458 1459 dcid = self.scid_to_dcid[scid] 1460 1461 self.device_under_test.l2cap.SendDynamicChannelPacket( 1462 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1463 1464 # Retransmission timer = 2, 20 * monitor timer = 360, so total timeout is 362 1465 time.sleep(362) 1466 cert_acl_data_asserts.assert_event_occurs( 1467 self.is_correct_disconnection_request) 1468 1469 def test_i_frame_transmissions_exceed_max_transmit(self): 1470 """ 1471 L2CAP/ERM/BV-12-C [I-Frame Transmissions Exceed MaxTransmit] 1472 Verify the IUT will close the channel when it receives an S-frame [RR] with the final bit set that does 1473 not acknowledge the previous I-frame sent by the IUT. 1474 """ 1475 cert_acl_handle = self._setup_link_from_cert() 1476 with EventCallbackStream( 1477 self.cert_device.hci_acl_manager.FetchAclData( 1478 empty_proto.Empty())) as cert_acl_data_stream: 1479 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1480 cert_acl_data_stream.register_callback(self._handle_control_packet) 1481 self.on_connection_response = self._on_connection_response_use_ertm 1482 1483 psm = 0x33 1484 scid = 0x41 1485 self._open_channel( 1486 cert_acl_data_stream, 1487 1, 1488 cert_acl_handle, 1489 scid, 1490 psm, 1491 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1492 1493 # FIXME: Order shouldn't matter here 1494 cert_acl_data_asserts.assert_event_occurs( 1495 self.is_correct_configuration_response) 1496 cert_acl_data_asserts.assert_event_occurs( 1497 self.is_correct_configuration_request) 1498 1499 dcid = self.scid_to_dcid[scid] 1500 1501 self.device_under_test.l2cap.SendDynamicChannelPacket( 1502 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1503 1504 cert_acl_data_asserts.assert_event_occurs( 1505 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0 1506 ) 1507 1508 s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder( 1509 dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY, 1510 l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 1511 0) 1512 self.cert_send_b_frame(s_frame) 1513 1514 cert_acl_data_asserts.assert_event_occurs( 1515 self.is_correct_disconnection_request) 1516 1517 def test_respond_to_rej(self): 1518 """ 1519 L2CAP/ERM/BV-13-C [Respond to S-Frame [REJ]] 1520 Verify the IUT retransmits I-frames starting from the sequence number specified in the S-frame [REJ]. 1521 """ 1522 self.ertm_tx_window_size = 2 1523 self.ertm_max_transmit = 2 1524 cert_acl_handle = self._setup_link_from_cert() 1525 with EventCallbackStream( 1526 self.cert_device.hci_acl_manager.FetchAclData( 1527 empty_proto.Empty())) as cert_acl_data_stream: 1528 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1529 cert_acl_data_stream.register_callback(self._handle_control_packet) 1530 self.on_connection_response = self._on_connection_response_use_ertm 1531 1532 psm = 0x33 1533 scid = 0x41 1534 self._open_channel( 1535 cert_acl_data_stream, 1536 1, 1537 cert_acl_handle, 1538 scid, 1539 psm, 1540 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1541 1542 # FIXME: Order shouldn't matter here 1543 cert_acl_data_asserts.assert_event_occurs( 1544 self.is_correct_configuration_response) 1545 cert_acl_data_asserts.assert_event_occurs( 1546 self.is_correct_configuration_request) 1547 1548 dcid = self.scid_to_dcid[scid] 1549 1550 self.device_under_test.l2cap.SendDynamicChannelPacket( 1551 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1552 self.device_under_test.l2cap.SendDynamicChannelPacket( 1553 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1554 for i in range(2): 1555 cert_acl_data_asserts.assert_event_occurs( 1556 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == i, 1557 timeout=timedelta(seconds=0.5)) 1558 1559 s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder( 1560 dcid, l2cap_packets.SupervisoryFunction.REJECT, 1561 l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0) 1562 self.cert_send_b_frame(s_frame) 1563 1564 for i in range(2): 1565 cert_acl_data_asserts.assert_event_occurs( 1566 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == i, 1567 timeout=timedelta(seconds=0.5)) 1568 1569 def test_receive_s_frame_rr_final_bit_set(self): 1570 """ 1571 L2CAP/ERM/BV-18-C [Receive S-Frame [RR] Final Bit = 1] 1572 Verify the IUT will retransmit any previously sent I-frames unacknowledged by receipt of an S-Frame 1573 [RR] with the Final Bit set. 1574 """ 1575 cert_acl_handle = self._setup_link_from_cert() 1576 with EventCallbackStream( 1577 self.cert_device.hci_acl_manager.FetchAclData( 1578 empty_proto.Empty())) as cert_acl_data_stream: 1579 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1580 cert_acl_data_stream.register_callback(self._handle_control_packet) 1581 self.on_connection_response = self._on_connection_response_use_ertm 1582 1583 psm = 0x33 1584 scid = 0x41 1585 self._open_channel( 1586 cert_acl_data_stream, 1587 1, 1588 cert_acl_handle, 1589 scid, 1590 psm, 1591 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1592 1593 # FIXME: Order shouldn't matter here 1594 cert_acl_data_asserts.assert_event_occurs( 1595 self.is_correct_configuration_response) 1596 cert_acl_data_asserts.assert_event_occurs( 1597 self.is_correct_configuration_request) 1598 1599 dcid = self.scid_to_dcid[scid] 1600 1601 self.device_under_test.l2cap.SendDynamicChannelPacket( 1602 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1603 1604 # TODO: Always use their retransmission timeout value 1605 time.sleep(2) 1606 cert_acl_data_asserts.assert_event_occurs( 1607 lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL 1608 ) 1609 1610 s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder( 1611 dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY, 1612 l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 1613 0) 1614 self.cert_send_b_frame(s_frame) 1615 1616 cert_acl_data_asserts.assert_event_occurs( 1617 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0 1618 ) 1619 1620 def test_receive_i_frame_final_bit_set(self): 1621 """ 1622 L2CAP/ERM/BV-19-C [Receive I-Frame Final Bit = 1] 1623 Verify the IUT will retransmit any previously sent I-frames unacknowledged by receipt of an I-frame 1624 with the final bit set. 1625 """ 1626 cert_acl_handle = self._setup_link_from_cert() 1627 with EventCallbackStream( 1628 self.cert_device.hci_acl_manager.FetchAclData( 1629 empty_proto.Empty())) as cert_acl_data_stream: 1630 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1631 cert_acl_data_stream.register_callback(self._handle_control_packet) 1632 self.on_connection_response = self._on_connection_response_use_ertm 1633 1634 psm = 0x33 1635 scid = 0x41 1636 self._open_channel( 1637 cert_acl_data_stream, 1638 1, 1639 cert_acl_handle, 1640 scid, 1641 psm, 1642 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1643 1644 # FIXME: Order shouldn't matter here 1645 cert_acl_data_asserts.assert_event_occurs( 1646 self.is_correct_configuration_response) 1647 cert_acl_data_asserts.assert_event_occurs( 1648 self.is_correct_configuration_request) 1649 1650 dcid = self.scid_to_dcid[scid] 1651 1652 self.device_under_test.l2cap.SendDynamicChannelPacket( 1653 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1654 1655 # TODO: Always use their retransmission timeout value 1656 time.sleep(2) 1657 cert_acl_data_asserts.assert_event_occurs( 1658 lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL 1659 ) 1660 1661 i_frame = l2cap_packets.EnhancedInformationFrameBuilder( 1662 dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0, 1663 l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, 1664 SAMPLE_PACKET) 1665 self.cert_send_b_frame(i_frame) 1666 1667 cert_acl_data_asserts.assert_event_occurs( 1668 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0 1669 ) 1670 1671 def test_recieve_rnr(self): 1672 """ 1673 L2CAP/ERM/BV-20-C [Enter Remote Busy Condition] 1674 Verify the IUT will not retransmit any I-frames when it receives a remote busy indication from the 1675 Lower Tester (S-frame [RNR]). 1676 """ 1677 cert_acl_handle = self._setup_link_from_cert() 1678 with EventCallbackStream( 1679 self.cert_device.hci_acl_manager.FetchAclData( 1680 empty_proto.Empty())) as cert_acl_data_stream: 1681 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1682 cert_acl_data_stream.register_callback(self._handle_control_packet) 1683 self.on_connection_response = self._on_connection_response_use_ertm 1684 1685 psm = 0x33 1686 scid = 0x41 1687 self._open_channel( 1688 cert_acl_data_stream, 1689 1, 1690 cert_acl_handle, 1691 scid, 1692 psm, 1693 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1694 1695 # FIXME: Order shouldn't matter here 1696 cert_acl_data_asserts.assert_event_occurs( 1697 self.is_correct_configuration_response) 1698 cert_acl_data_asserts.assert_event_occurs( 1699 self.is_correct_configuration_request) 1700 1701 dcid = self.scid_to_dcid[scid] 1702 1703 self.device_under_test.l2cap.SendDynamicChannelPacket( 1704 l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc')) 1705 1706 # TODO: Always use their retransmission timeout value 1707 time.sleep(2) 1708 cert_acl_data_asserts.assert_event_occurs( 1709 lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL 1710 ) 1711 1712 s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder( 1713 dcid, l2cap_packets.SupervisoryFunction.RECEIVER_NOT_READY, 1714 l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 1715 0) 1716 self.cert_send_b_frame(s_frame) 1717 1718 cert_acl_data_asserts.assert_none_matching( 1719 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0 1720 ) 1721 1722 def test_sent_rej_lost(self): 1723 """ 1724 L2CAP/ERM/BI-01-C [S-Frame [REJ] Lost or Corrupted] 1725 Verify the IUT can handle receipt of an S-=frame [RR] Poll = 1 if the S-frame [REJ] sent from the IUT 1726 is lost. 1727 """ 1728 self.ertm_tx_window_size = 5 1729 cert_acl_handle = self._setup_link_from_cert() 1730 with EventCallbackStream( 1731 self.cert_device.hci_acl_manager.FetchAclData( 1732 empty_proto.Empty())) as cert_acl_data_stream: 1733 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1734 cert_acl_data_stream.register_callback(self._handle_control_packet) 1735 self.on_connection_response = self._on_connection_response_use_ertm 1736 1737 psm = 0x33 1738 scid = 0x41 1739 self._open_channel( 1740 cert_acl_data_stream, 1741 1, 1742 cert_acl_handle, 1743 scid, 1744 psm, 1745 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1746 1747 # FIXME: Order shouldn't matter here 1748 cert_acl_data_asserts.assert_event_occurs( 1749 self.is_correct_configuration_response) 1750 cert_acl_data_asserts.assert_event_occurs( 1751 self.is_correct_configuration_request) 1752 1753 dcid = self.scid_to_dcid[scid] 1754 1755 i_frame = l2cap_packets.EnhancedInformationFrameBuilder( 1756 dcid, 0, l2cap_packets.Final.NOT_SET, 0, 1757 l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, 1758 SAMPLE_PACKET) 1759 self.cert_send_b_frame(i_frame) 1760 cert_acl_data_asserts.assert_event_occurs( 1761 lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 1 1762 ) 1763 1764 i_frame = l2cap_packets.EnhancedInformationFrameBuilder( 1765 dcid, self.ertm_tx_window_size - 1, l2cap_packets.Final.NOT_SET, 1766 0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, 1767 SAMPLE_PACKET) 1768 self.cert_send_b_frame(i_frame) 1769 cert_acl_data_asserts.assert_event_occurs( 1770 lambda packet: self.get_s_from_ertm_s_frame(scid, packet) == l2cap_packets.SupervisoryFunction.REJECT 1771 ) 1772 1773 s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder( 1774 dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY, 1775 l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0) 1776 self.cert_send_b_frame(s_frame) 1777 1778 cert_acl_data_asserts.assert_event_occurs( 1779 lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 1 and self.get_f_from_ertm_s_frame(scid, packet) == l2cap_packets.Final.POLL_RESPONSE) 1780 for i in range(1, self.ertm_tx_window_size): 1781 i_frame = l2cap_packets.EnhancedInformationFrameBuilder( 1782 dcid, i, l2cap_packets.Final.NOT_SET, 0, 1783 l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, 1784 SAMPLE_PACKET) 1785 self.cert_send_b_frame(i_frame) 1786 cert_acl_data_asserts.assert_event_occurs( 1787 lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1 1788 ) 1789 1790 def test_handle_duplicate_srej(self): 1791 """ 1792 L2CAP/ERM/BI-03-C [Handle Duplicate S-Frame [SREJ]] 1793 Verify the IUT will only retransmit the requested I-frame once after receiving a duplicate SREJ. 1794 """ 1795 cert_acl_handle = self._setup_link_from_cert() 1796 with EventCallbackStream( 1797 self.cert_device.hci_acl_manager.FetchAclData( 1798 empty_proto.Empty())) as cert_acl_data_stream: 1799 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1800 cert_acl_data_stream.register_callback(self._handle_control_packet) 1801 self.on_connection_response = self._on_connection_response_use_ertm 1802 1803 psm = 0x33 1804 scid = 0x41 1805 self._open_channel( 1806 cert_acl_data_stream, 1807 1, 1808 cert_acl_handle, 1809 scid, 1810 psm, 1811 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1812 1813 # FIXME: Order shouldn't matter here 1814 cert_acl_data_asserts.assert_event_occurs( 1815 self.is_correct_configuration_response) 1816 cert_acl_data_asserts.assert_event_occurs( 1817 self.is_correct_configuration_request) 1818 1819 dcid = self.scid_to_dcid[scid] 1820 1821 self.device_under_test.l2cap.SendDynamicChannelPacket( 1822 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1823 self.device_under_test.l2cap.SendDynamicChannelPacket( 1824 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1825 cert_acl_data_asserts.assert_event_occurs( 1826 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0, 1827 timeout=timedelta(0.5)) 1828 cert_acl_data_asserts.assert_event_occurs( 1829 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1, 1830 timeout=timedelta(0.5)) 1831 cert_acl_data_asserts.assert_event_occurs( 1832 lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL 1833 ) 1834 1835 # Send SREJ with F not set 1836 s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder( 1837 dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT, 1838 l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0) 1839 self.cert_send_b_frame(s_frame) 1840 1841 cert_acl_data_asserts.assert_none(timeout=timedelta(seconds=0.5)) 1842 # Send SREJ with F set 1843 s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder( 1844 dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT, 1845 l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 1846 0) 1847 self.cert_send_b_frame(s_frame) 1848 1849 cert_acl_data_asserts.assert_event_occurs( 1850 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0 1851 ) 1852 1853 def test_handle_receipt_rej_and_rr_with_f_set(self): 1854 """ 1855 L2CAP/ERM/BI-04-C [Handle Receipt of S-Frame [REJ] and S-Frame [RR, F=1] that Both Require Retransmission of the Same I-Frames] 1856 Verify the IUT will only retransmit the requested I-frames once after receiving an S-frame [REJ] 1857 followed by an S-frame [RR] with the Final bit set that indicates the same I-frames should be 1858 retransmitted. 1859 """ 1860 cert_acl_handle = self._setup_link_from_cert() 1861 with EventCallbackStream( 1862 self.cert_device.hci_acl_manager.FetchAclData( 1863 empty_proto.Empty())) as cert_acl_data_stream: 1864 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1865 cert_acl_data_stream.register_callback(self._handle_control_packet) 1866 self.on_connection_response = self._on_connection_response_use_ertm 1867 1868 psm = 0x33 1869 scid = 0x41 1870 self._open_channel( 1871 cert_acl_data_stream, 1872 1, 1873 cert_acl_handle, 1874 scid, 1875 psm, 1876 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1877 1878 # FIXME: Order shouldn't matter here 1879 cert_acl_data_asserts.assert_event_occurs( 1880 self.is_correct_configuration_response) 1881 cert_acl_data_asserts.assert_event_occurs( 1882 self.is_correct_configuration_request) 1883 1884 dcid = self.scid_to_dcid[scid] 1885 1886 self.device_under_test.l2cap.SendDynamicChannelPacket( 1887 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1888 self.device_under_test.l2cap.SendDynamicChannelPacket( 1889 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1890 cert_acl_data_asserts.assert_event_occurs( 1891 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0, 1892 timeout=timedelta(0.5)) 1893 cert_acl_data_asserts.assert_event_occurs( 1894 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1, 1895 timeout=timedelta(0.5)) 1896 cert_acl_data_asserts.assert_event_occurs( 1897 lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL, 1898 timeout=timedelta(2)) 1899 1900 # Send REJ with F not set 1901 s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder( 1902 dcid, l2cap_packets.SupervisoryFunction.REJECT, 1903 l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0) 1904 self.cert_send_b_frame(s_frame) 1905 1906 cert_acl_data_asserts.assert_none(timeout=timedelta(seconds=0.5)) 1907 1908 # Send RR with F set 1909 s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder( 1910 dcid, l2cap_packets.SupervisoryFunction.REJECT, 1911 l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 1912 0) 1913 self.cert_send_b_frame(s_frame) 1914 1915 cert_acl_data_asserts.assert_event_occurs( 1916 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0 1917 ) 1918 cert_acl_data_asserts.assert_event_occurs( 1919 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1 1920 ) 1921 1922 def test_handle_rej_and_i_frame_with_f_set(self): 1923 """ 1924 L2CAP/ERM/BI-05-C [Handle receipt of S-Frame [REJ] and I-Frame [F=1] that Both Require Retransmission of the Same I-Frames] 1925 Verify the IUT will only retransmit the requested I-frames once after receiving an S-frame [REJ] 1926 followed by an I-frame with the Final bit set that indicates the same I-frames should be retransmitted. 1927 """ 1928 cert_acl_handle = self._setup_link_from_cert() 1929 with EventCallbackStream( 1930 self.cert_device.hci_acl_manager.FetchAclData( 1931 empty_proto.Empty())) as cert_acl_data_stream: 1932 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 1933 cert_acl_data_stream.register_callback(self._handle_control_packet) 1934 self.on_connection_response = self._on_connection_response_use_ertm 1935 1936 psm = 0x33 1937 scid = 0x41 1938 self._open_channel( 1939 cert_acl_data_stream, 1940 1, 1941 cert_acl_handle, 1942 scid, 1943 psm, 1944 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 1945 1946 # FIXME: Order shouldn't matter here 1947 cert_acl_data_asserts.assert_event_occurs( 1948 self.is_correct_configuration_response) 1949 cert_acl_data_asserts.assert_event_occurs( 1950 self.is_correct_configuration_request) 1951 1952 dcid = self.scid_to_dcid[scid] 1953 1954 self.device_under_test.l2cap.SendDynamicChannelPacket( 1955 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1956 self.device_under_test.l2cap.SendDynamicChannelPacket( 1957 l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) 1958 cert_acl_data_asserts.assert_event_occurs( 1959 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0, 1960 timeout=timedelta(0.5)) 1961 cert_acl_data_asserts.assert_event_occurs( 1962 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1, 1963 timeout=timedelta(0.5)) 1964 cert_acl_data_asserts.assert_event_occurs( 1965 lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL, 1966 timeout=timedelta(2)) 1967 1968 # Send SREJ with F not set 1969 s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder( 1970 dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT, 1971 l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0) 1972 self.cert_send_b_frame(s_frame) 1973 1974 cert_acl_data_asserts.assert_none(timeout=timedelta(seconds=0.5)) 1975 1976 i_frame = l2cap_packets.EnhancedInformationFrameBuilder( 1977 dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0, 1978 l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, 1979 SAMPLE_PACKET) 1980 self.cert_send_b_frame(i_frame) 1981 1982 cert_acl_data_asserts.assert_event_occurs( 1983 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0 1984 ) 1985 cert_acl_data_asserts.assert_event_occurs( 1986 lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1 1987 ) 1988 1989 def test_initiated_configurtion_request_ertm(self): 1990 """ 1991 L2CAP/CMC/BV-01-C [IUT Initiated Configuration of Enhanced Retransmission Mode] 1992 Verify the IUT can send a Configuration Request command containing the F&EC option that specifies 1993 Enhanced Retransmission Mode. 1994 """ 1995 cert_acl_handle = self._setup_link_from_cert() 1996 with EventCallbackStream( 1997 self.cert_device.hci_acl_manager.FetchAclData( 1998 empty_proto.Empty())) as cert_acl_data_stream: 1999 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 2000 cert_acl_data_stream.register_callback(self._handle_control_packet) 2001 self.on_connection_response = self._on_connection_response_use_ertm 2002 2003 psm = 0x33 2004 scid = 0x41 2005 self._open_channel( 2006 cert_acl_data_stream, 2007 1, 2008 cert_acl_handle, 2009 scid, 2010 psm, 2011 mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) 2012 2013 # TODO: Fix this test. It doesn't work so far with PDL struct 2014 2015 cert_acl_data_asserts.assert_event_occurs( 2016 self.is_correct_configuration_request) 2017 asserts.skip("Struct not working") 2018 2019 def test_respond_configuration_request_ertm(self): 2020 """ 2021 L2CAP/CMC/BV-02-C [Lower Tester Initiated Configuration of Enhanced Retransmission Mode] 2022 Verify the IUT can accept a Configuration Request from the Lower Tester containing an F&EC option 2023 that specifies Enhanced Retransmission Mode. 2024 """ 2025 cert_acl_handle = self._setup_link_from_cert() 2026 with EventCallbackStream( 2027 self.cert_device.hci_acl_manager.FetchAclData( 2028 empty_proto.Empty())) as cert_acl_data_stream: 2029 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 2030 cert_acl_data_stream.register_callback(self._handle_control_packet) 2031 psm = 1 2032 scid = 0x0101 2033 self.retransmission_mode = l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM 2034 self.device_under_test.l2cap.SetDynamicChannel( 2035 l2cap_facade_pb2.SetEnableDynamicChannelRequest( 2036 psm=psm, retransmission_mode=self.retransmission_mode)) 2037 2038 open_channel = l2cap_packets.ConnectionRequestBuilder(1, psm, scid) 2039 open_channel_l2cap = l2cap_packets.BasicFrameBuilder( 2040 1, open_channel) 2041 self.cert_send_b_frame(open_channel_l2cap) 2042 2043 # TODO: Verify that the type should be ERTM 2044 cert_acl_data_asserts.assert_event_occurs( 2045 self.is_correct_configuration_response) 2046