1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import bluetooth_packets_python3 as bt_packets 18import logging 19 20from bluetooth_packets_python3 import hci_packets 21from bluetooth_packets_python3.hci_packets import EventCode 22from bluetooth_packets_python3 import l2cap_packets 23from bluetooth_packets_python3.l2cap_packets import CommandCode, LeCommandCode 24from bluetooth_packets_python3.l2cap_packets import ConfigurationResponseResult 25from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult 26from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType 27from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult 28 29 30class HciMatchers(object): 31 32 @staticmethod 33 def CommandComplete(opcode): 34 return lambda msg: HciMatchers._is_matching_command_complete(msg.payload, opcode) 35 36 @staticmethod 37 def ExtractMatchingCommandComplete(packet_bytes, opcode=None): 38 return HciMatchers._extract_matching_command_complete(packet_bytes, opcode) 39 40 @staticmethod 41 def _is_matching_command_complete(packet_bytes, opcode=None): 42 return HciMatchers._extract_matching_command_complete(packet_bytes, opcode) is not None 43 44 @staticmethod 45 def _extract_matching_command_complete(packet_bytes, opcode=None): 46 event = HciMatchers._extract_matching_event(packet_bytes, EventCode.COMMAND_COMPLETE) 47 if event is None: 48 return None 49 complete = hci_packets.CommandCompleteView(event) 50 if opcode is None or complete is None: 51 return complete 52 else: 53 if complete.GetCommandOpCode() != opcode: 54 return None 55 else: 56 return complete 57 58 @staticmethod 59 def CommandStatus(opcode=None): 60 return lambda msg: HciMatchers._is_matching_command_status(msg.payload, opcode) 61 62 @staticmethod 63 def ExtractMatchingCommandStatus(packet_bytes, opcode=None): 64 return HciMatchers._extract_matching_command_complete(packet_bytes, opcode) 65 66 @staticmethod 67 def _is_matching_command_status(packet_bytes, opcode=None): 68 return HciMatchers._extract_matching_command_status(packet_bytes, opcode) is not None 69 70 @staticmethod 71 def _extract_matching_command_status(packet_bytes, opcode=None): 72 event = HciMatchers._extract_matching_event(packet_bytes, EventCode.COMMAND_STATUS) 73 if event is None: 74 return None 75 complete = hci_packets.CommandStatusView(event) 76 if opcode is None or complete is None: 77 return complete 78 else: 79 if complete.GetCommandOpCode() != opcode: 80 return None 81 else: 82 return complete 83 84 @staticmethod 85 def EventWithCode(event_code): 86 return lambda msg: HciMatchers._is_matching_event(msg.payload, event_code) 87 88 @staticmethod 89 def ExtractEventWithCode(packet_bytes, event_code): 90 return HciMatchers._extract_matching_event(packet_bytes, event_code) 91 92 @staticmethod 93 def _is_matching_event(packet_bytes, event_code): 94 return HciMatchers._extract_matching_event(packet_bytes, event_code) is not None 95 96 @staticmethod 97 def _extract_matching_event(packet_bytes, event_code): 98 event = hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes))) 99 if event is None: 100 return None 101 if event_code is not None and event.GetEventCode() != event_code: 102 return None 103 return event 104 105 @staticmethod 106 def LeEventWithCode(subevent_code): 107 return lambda msg: HciMatchers._extract_matching_le_event(msg.payload, subevent_code) is not None 108 109 @staticmethod 110 def ExtractLeEventWithCode(packet_bytes, subevent_code): 111 return HciMatchers._extract_matching_le_event(packet_bytes, subevent_code) 112 113 @staticmethod 114 def _extract_matching_le_event(packet_bytes, subevent_code): 115 inner_event = HciMatchers._extract_matching_event(packet_bytes, hci_packets.EventCode.LE_META_EVENT) 116 if inner_event is None: 117 return None 118 event = hci_packets.LeMetaEventView(inner_event) 119 if event.GetSubeventCode() != subevent_code: 120 return None 121 return event 122 123 @staticmethod 124 def LeConnectionComplete(): 125 return lambda msg: HciMatchers._extract_le_connection_complete(msg.payload) is not None 126 127 @staticmethod 128 def ExtractLeConnectionComplete(packet_bytes): 129 return HciMatchers._extract_le_connection_complete(packet_bytes) 130 131 @staticmethod 132 def _extract_le_connection_complete(packet_bytes): 133 inner_event = HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.CONNECTION_COMPLETE) 134 if inner_event is not None: 135 return hci_packets.LeConnectionCompleteView(inner_event) 136 137 inner_event = HciMatchers._extract_matching_le_event(packet_bytes, 138 hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE) 139 if inner_event is not None: 140 return hci_packets.LeEnhancedConnectionCompleteView(inner_event) 141 142 return None 143 144 @staticmethod 145 def LogEventCode(): 146 return lambda event: logging.info("Received event: %x" % hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(event.payload))).GetEventCode()) 147 148 @staticmethod 149 def LinkKeyRequest(): 150 return lambda event: HciMatchers.EventWithCode(EventCode.LINK_KEY_REQUEST) 151 152 @staticmethod 153 def IoCapabilityRequest(): 154 return lambda event: HciMatchers.EventWithCode(EventCode.IO_CAPABILITY_REQUEST) 155 156 @staticmethod 157 def IoCapabilityResponse(): 158 return lambda event: HciMatchers.EventWithCode(EventCode.IO_CAPABILITY_RESPONSE) 159 160 @staticmethod 161 def UserPasskeyNotification(): 162 return lambda event: HciMatchers.EventWithCode(EventCode.USER_PASSKEY_NOTIFICATION) 163 164 @staticmethod 165 def UserPasskeyRequest(): 166 return lambda event: HciMatchers.EventWithCode(EventCode.USER_PASSKEY_REQUEST) 167 168 @staticmethod 169 def UserConfirmationRequest(): 170 return lambda event: HciMatchers.EventWithCode(EventCode.USER_CONFIRMATION_REQUEST) 171 172 @staticmethod 173 def RemoteHostSupportedFeaturesNotification(): 174 return lambda event: HciMatchers.EventWithCode(EventCode.REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION) 175 176 @staticmethod 177 def LinkKeyNotification(): 178 return lambda event: HciMatchers.EventWithCode(EventCode.LINK_KEY_NOTIFICATION) 179 180 @staticmethod 181 def SimplePairingComplete(): 182 return lambda event: HciMatchers.EventWithCode(EventCode.SIMPLE_PAIRING_COMPLETE) 183 184 @staticmethod 185 def Disconnect(): 186 return lambda event: HciMatchers.EventWithCode(EventCode.DISCONNECT) 187 188 @staticmethod 189 def DisconnectionComplete(): 190 return lambda event: HciMatchers.EventWithCode(EventCode.DISCONNECTION_COMPLETE) 191 192 @staticmethod 193 def RemoteOobDataRequest(): 194 return lambda event: HciMatchers.EventWithCode(EventCode.REMOTE_OOB_DATA_REQUEST) 195 196 @staticmethod 197 def PinCodeRequest(): 198 return lambda event: HciMatchers.EventWithCode(EventCode.PIN_CODE_REQUEST) 199 200 @staticmethod 201 def LoopbackOf(packet): 202 return HciMatchers.Exactly(hci_packets.LoopbackCommandBuilder(packet)) 203 204 @staticmethod 205 def Exactly(packet): 206 data = bytes(packet.Serialize()) 207 return lambda event: data == event.payload 208 209 210class AdvertisingMatchers(object): 211 212 @staticmethod 213 def AdvertisingCallbackMsg(type, advertiser_id=None, status=None, data=None): 214 return lambda event: True if event.message_type == type and (advertiser_id == None or advertiser_id == event.advertiser_id) \ 215 and (status == None or status == event.status) and (data == None or data == event.data) else False 216 217 @staticmethod 218 def AddressMsg(type, advertiser_id=None, address=None): 219 return lambda event: True if event.message_type == type and (advertiser_id == None or advertiser_id == event.advertiser_id) \ 220 and (address == None or address == event.address) else False 221 222 223class ScanningMatchers(object): 224 225 @staticmethod 226 def ScanningCallbackMsg(type, status=None, data=None): 227 return lambda event: True if event.message_type == type and (status == None or status == event.status) \ 228 and (data == None or data == event.data) else False 229 230 231class NeighborMatchers(object): 232 233 @staticmethod 234 def InquiryResult(address): 235 return lambda msg: NeighborMatchers._is_matching_inquiry_result(msg.packet, address) 236 237 @staticmethod 238 def _is_matching_inquiry_result(packet, address): 239 hci_event = HciMatchers.ExtractEventWithCode(packet, EventCode.INQUIRY_RESULT) 240 if hci_event is None: 241 return False 242 inquiry_view = hci_packets.InquiryResultView(hci_event) 243 if inquiry_view is None: 244 return False 245 results = inquiry_view.GetResponses() 246 return any((address == result.bd_addr for result in results)) 247 248 @staticmethod 249 def InquiryResultwithRssi(address): 250 return lambda msg: NeighborMatchers._is_matching_inquiry_result_with_rssi(msg.packet, address) 251 252 @staticmethod 253 def _is_matching_inquiry_result_with_rssi(packet, address): 254 hci_event = HciMatchers.ExtractEventWithCode(packet, EventCode.INQUIRY_RESULT_WITH_RSSI) 255 if hci_event is None: 256 return False 257 inquiry_view = hci_packets.InquiryResultWithRssiView(hci_event) 258 if inquiry_view is None: 259 return False 260 results = inquiry_view.GetResponses() 261 return any((address == result.address for result in results)) 262 263 @staticmethod 264 def ExtendedInquiryResult(address): 265 return lambda msg: NeighborMatchers._is_matching_extended_inquiry_result(msg.packet, address) 266 267 @staticmethod 268 def _is_matching_extended_inquiry_result(packet, address): 269 hci_event = HciMatchers.ExtractEventWithCode(packet, EventCode.EXTENDED_INQUIRY_RESULT) 270 if hci_event is None: 271 return False 272 extended_view = hci_packets.ExtendedInquiryResultView(hci_event) 273 if extended_view is None: 274 return False 275 return address == extended_view.GetAddress() 276 277 278class L2capMatchers(object): 279 280 @staticmethod 281 def ConnectionRequest(psm): 282 return lambda packet: L2capMatchers._is_matching_connection_request(packet, psm) 283 284 @staticmethod 285 def ConnectionResponse(scid): 286 return lambda packet: L2capMatchers._is_matching_connection_response(packet, scid) 287 288 @staticmethod 289 def ConfigurationResponse(result=ConfigurationResponseResult.SUCCESS): 290 return lambda packet: L2capMatchers._is_matching_configuration_response(packet, result) 291 292 @staticmethod 293 def ConfigurationRequest(cid=None): 294 return lambda packet: L2capMatchers._is_matching_configuration_request_with_cid(packet, cid) 295 296 @staticmethod 297 def ConfigurationRequestWithErtm(): 298 return lambda packet: L2capMatchers._is_matching_configuration_request_with_ertm(packet) 299 300 @staticmethod 301 def ConfigurationRequestView(dcid): 302 return lambda request_view: request_view.GetDestinationCid() == dcid 303 304 @staticmethod 305 def DisconnectionRequest(scid, dcid): 306 return lambda packet: L2capMatchers._is_matching_disconnection_request(packet, scid, dcid) 307 308 @staticmethod 309 def DisconnectionResponse(scid, dcid): 310 return lambda packet: L2capMatchers._is_matching_disconnection_response(packet, scid, dcid) 311 312 @staticmethod 313 def EchoResponse(): 314 return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.ECHO_RESPONSE) 315 316 @staticmethod 317 def CommandReject(): 318 return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT) 319 320 @staticmethod 321 def LeCommandReject(): 322 return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT) 323 324 @staticmethod 325 def LeConnectionParameterUpdateRequest(): 326 return lambda packet: L2capMatchers._is_le_control_frame_with_code( 327 packet, LeCommandCode.CONNECTION_PARAMETER_UPDATE_REQUEST) 328 329 @staticmethod 330 def LeConnectionParameterUpdateResponse(result=l2cap_packets.ConnectionParameterUpdateResponseResult.ACCEPTED): 331 return lambda packet: L2capMatchers._is_matching_connection_parameter_update_response(packet, result) 332 333 @staticmethod 334 def CreditBasedConnectionRequest(psm): 335 return lambda packet: L2capMatchers._is_matching_credit_based_connection_request(packet, psm) 336 337 @staticmethod 338 def CreditBasedConnectionResponse(result=LeCreditBasedConnectionResponseResult.SUCCESS): 339 return lambda packet: L2capMatchers._is_matching_credit_based_connection_response(packet, result) 340 341 @staticmethod 342 def CreditBasedConnectionResponseUsedCid(): 343 return lambda packet: L2capMatchers._is_matching_credit_based_connection_response( 344 packet, LeCreditBasedConnectionResponseResult.SOURCE_CID_ALREADY_ALLOCATED 345 ) or L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT) 346 347 @staticmethod 348 def LeDisconnectionRequest(scid, dcid): 349 return lambda packet: L2capMatchers._is_matching_le_disconnection_request(packet, scid, dcid) 350 351 @staticmethod 352 def LeDisconnectionResponse(scid, dcid): 353 return lambda packet: L2capMatchers._is_matching_le_disconnection_response(packet, scid, dcid) 354 355 @staticmethod 356 def LeFlowControlCredit(cid): 357 return lambda packet: L2capMatchers._is_matching_le_flow_control_credit(packet, cid) 358 359 @staticmethod 360 def SFrame(req_seq=None, f=None, s=None, p=None): 361 return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, req_seq, f, s, p) 362 363 @staticmethod 364 def IFrame(tx_seq=None, payload=None, f=None): 365 return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=False) 366 367 @staticmethod 368 def IFrameWithFcs(tx_seq=None, payload=None, f=None): 369 return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=True) 370 371 @staticmethod 372 def IFrameStart(tx_seq=None, payload=None, f=None): 373 return lambda packet: L2capMatchers._is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False) 374 375 @staticmethod 376 def Data(payload): 377 return lambda packet: packet.GetPayload().GetBytes() == payload 378 379 @staticmethod 380 def FirstLeIFrame(payload, sdu_size): 381 return lambda packet: L2capMatchers._is_matching_first_le_i_frame(packet, payload, sdu_size) 382 383 # this is a hack - should be removed 384 @staticmethod 385 def PartialData(payload): 386 return lambda packet: payload in packet.GetPayload().GetBytes() 387 388 # this is a hack - should be removed 389 @staticmethod 390 def PacketPayloadRawData(payload): 391 return lambda packet: payload in packet.payload 392 393 # this is a hack - should be removed 394 @staticmethod 395 def PacketPayloadWithMatchingPsm(psm): 396 return lambda packet: None if psm != packet.psm else packet 397 398 # this is a hack - should be removed 399 @staticmethod 400 def PacketPayloadWithMatchingCid(cid): 401 return lambda packet: None if cid != packet.fixed_cid else packet 402 403 @staticmethod 404 def ExtractBasicFrame(scid): 405 return lambda packet: L2capMatchers._basic_frame_for(packet, scid) 406 407 @staticmethod 408 def ExtractBasicFrameWithFcs(scid): 409 return lambda packet: L2capMatchers._basic_frame_with_fcs_for(packet, scid) 410 411 @staticmethod 412 def InformationRequestWithType(info_type): 413 return lambda packet: L2capMatchers._information_request_with_type(packet, info_type) 414 415 @staticmethod 416 def InformationResponseExtendedFeatures(supports_ertm=None, 417 supports_streaming=None, 418 supports_fcs=None, 419 supports_fixed_channels=None): 420 return lambda packet: L2capMatchers._is_matching_information_response_extended_features( 421 packet, supports_ertm, supports_streaming, supports_fcs, supports_fixed_channels) 422 423 @staticmethod 424 def _basic_frame(packet): 425 if packet is None: 426 return None 427 return l2cap_packets.BasicFrameView(bt_packets.PacketViewLittleEndian(list(packet.payload))) 428 429 @staticmethod 430 def _basic_frame_with_fcs(packet): 431 if packet is None: 432 return None 433 return l2cap_packets.BasicFrameWithFcsView(bt_packets.PacketViewLittleEndian(list(packet.payload))) 434 435 @staticmethod 436 def _basic_frame_for(packet, scid): 437 frame = L2capMatchers._basic_frame(packet) 438 if frame.GetChannelId() != scid: 439 return None 440 return frame 441 442 @staticmethod 443 def _basic_frame_with_fcs_for(packet, scid): 444 frame = L2capMatchers._basic_frame(packet) 445 if frame.GetChannelId() != scid: 446 return None 447 frame = L2capMatchers._basic_frame_with_fcs(packet) 448 if frame is None: 449 return None 450 return frame 451 452 @staticmethod 453 def _information_frame(packet): 454 standard_frame = l2cap_packets.StandardFrameView(packet) 455 if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME: 456 return None 457 return l2cap_packets.EnhancedInformationFrameView(standard_frame) 458 459 @staticmethod 460 def _information_frame_with_fcs(packet): 461 standard_frame = l2cap_packets.StandardFrameWithFcsView(packet) 462 if standard_frame is None: 463 return None 464 if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME: 465 return None 466 return l2cap_packets.EnhancedInformationFrameWithFcsView(standard_frame) 467 468 @staticmethod 469 def _information_start_frame(packet): 470 start_frame = L2capMatchers._information_frame(packet) 471 if start_frame is None: 472 return None 473 return l2cap_packets.EnhancedInformationStartFrameView(start_frame) 474 475 @staticmethod 476 def _information_start_frame_with_fcs(packet): 477 start_frame = L2capMatchers._information_frame_with_fcs(packet) 478 if start_frame is None: 479 return None 480 return l2cap_packets.EnhancedInformationStartFrameWithFcsView(start_frame) 481 482 @staticmethod 483 def _supervisory_frame(packet): 484 standard_frame = l2cap_packets.StandardFrameView(packet) 485 if standard_frame.GetFrameType() != l2cap_packets.FrameType.S_FRAME: 486 return None 487 return l2cap_packets.EnhancedSupervisoryFrameView(standard_frame) 488 489 @staticmethod 490 def _is_matching_information_frame(packet, tx_seq, payload, f, fcs=False): 491 if fcs: 492 frame = L2capMatchers._information_frame_with_fcs(packet) 493 else: 494 frame = L2capMatchers._information_frame(packet) 495 if frame is None: 496 return False 497 if tx_seq is not None and frame.GetTxSeq() != tx_seq: 498 return False 499 if payload is not None and frame.GetPayload().GetBytes() != payload: 500 return False 501 if f is not None and frame.GetF() != f: 502 return False 503 return True 504 505 @staticmethod 506 def _is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False): 507 if fcs: 508 frame = L2capMatchers._information_start_frame_with_fcs(packet) 509 else: 510 frame = L2capMatchers._information_start_frame(packet) 511 if frame is None: 512 return False 513 if tx_seq is not None and frame.GetTxSeq() != tx_seq: 514 return False 515 if payload is not None and frame.GetPayload().GetBytes() != payload: 516 return False 517 if f is not None and frame.GetF() != f: 518 return False 519 return True 520 521 @staticmethod 522 def _is_matching_supervisory_frame(packet, req_seq, f, s, p): 523 frame = L2capMatchers._supervisory_frame(packet) 524 if frame is None: 525 return False 526 if req_seq is not None and frame.GetReqSeq() != req_seq: 527 return False 528 if f is not None and frame.GetF() != f: 529 return False 530 if s is not None and frame.GetS() != s: 531 return False 532 if p is not None and frame.GetP() != p: 533 return False 534 return True 535 536 @staticmethod 537 def _is_matching_first_le_i_frame(packet, payload, sdu_size): 538 first_le_i_frame = l2cap_packets.FirstLeInformationFrameView(packet) 539 return first_le_i_frame.GetPayload().GetBytes() == payload and first_le_i_frame.GetL2capSduLength() == sdu_size 540 541 @staticmethod 542 def _control_frame(packet): 543 if packet.GetChannelId() != 1: 544 return None 545 return l2cap_packets.ControlView(packet.GetPayload()) 546 547 @staticmethod 548 def _le_control_frame(packet): 549 if packet.GetChannelId() != 5: 550 return None 551 return l2cap_packets.LeControlView(packet.GetPayload()) 552 553 @staticmethod 554 def control_frame_with_code(packet, code): 555 frame = L2capMatchers._control_frame(packet) 556 if frame is None or frame.GetCode() != code: 557 return None 558 return frame 559 560 @staticmethod 561 def le_control_frame_with_code(packet, code): 562 frame = L2capMatchers._le_control_frame(packet) 563 if frame is None or frame.GetCode() != code: 564 return None 565 return frame 566 567 @staticmethod 568 def _is_control_frame_with_code(packet, code): 569 return L2capMatchers.control_frame_with_code(packet, code) is not None 570 571 @staticmethod 572 def _is_le_control_frame_with_code(packet, code): 573 return L2capMatchers.le_control_frame_with_code(packet, code) is not None 574 575 @staticmethod 576 def _is_matching_connection_request(packet, psm): 577 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_REQUEST) 578 if frame is None: 579 return False 580 request = l2cap_packets.ConnectionRequestView(frame) 581 return request.GetPsm() == psm 582 583 @staticmethod 584 def _is_matching_connection_response(packet, scid): 585 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_RESPONSE) 586 if frame is None: 587 return False 588 response = l2cap_packets.ConnectionResponseView(frame) 589 return response.GetSourceCid() == scid and response.GetResult( 590 ) == ConnectionResponseResult.SUCCESS and response.GetDestinationCid() != 0 591 592 @staticmethod 593 def _is_matching_configuration_request_with_cid(packet, cid=None): 594 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST) 595 if frame is None: 596 return False 597 request = l2cap_packets.ConfigurationRequestView(frame) 598 dcid = request.GetDestinationCid() 599 return cid is None or cid == dcid 600 601 @staticmethod 602 def _is_matching_configuration_request_with_ertm(packet): 603 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST) 604 if frame is None: 605 return False 606 request = l2cap_packets.ConfigurationRequestView(frame) 607 config_bytes = request.GetBytes() 608 # TODO(b/153189503): Use packet struct parser. 609 return b"\x04\x09\x03" in config_bytes 610 611 @staticmethod 612 def _is_matching_configuration_response(packet, result=ConfigurationResponseResult.SUCCESS): 613 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_RESPONSE) 614 if frame is None: 615 return False 616 response = l2cap_packets.ConfigurationResponseView(frame) 617 return response.GetResult() == result 618 619 @staticmethod 620 def _is_matching_disconnection_request(packet, scid, dcid): 621 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.DISCONNECTION_REQUEST) 622 if frame is None: 623 return False 624 request = l2cap_packets.DisconnectionRequestView(frame) 625 return request.GetSourceCid() == scid and request.GetDestinationCid() == dcid 626 627 @staticmethod 628 def _is_matching_disconnection_response(packet, scid, dcid): 629 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.DISCONNECTION_RESPONSE) 630 if frame is None: 631 return False 632 response = l2cap_packets.DisconnectionResponseView(frame) 633 return response.GetSourceCid() == scid and response.GetDestinationCid() == dcid 634 635 @staticmethod 636 def _is_matching_le_disconnection_response(packet, scid, dcid): 637 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.DISCONNECTION_RESPONSE) 638 if frame is None: 639 return False 640 response = l2cap_packets.LeDisconnectionResponseView(frame) 641 return response.GetSourceCid() == scid and response.GetDestinationCid() == dcid 642 643 @staticmethod 644 def _is_matching_le_disconnection_request(packet, scid, dcid): 645 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.DISCONNECTION_REQUEST) 646 if frame is None: 647 return False 648 request = l2cap_packets.LeDisconnectionRequestView(frame) 649 return request.GetSourceCid() == scid and request.GetDestinationCid() == dcid 650 651 @staticmethod 652 def _is_matching_le_flow_control_credit(packet, cid): 653 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_FLOW_CONTROL_CREDIT) 654 if frame is None: 655 return False 656 request = l2cap_packets.LeFlowControlCreditView(frame) 657 return request.GetCid() == cid 658 659 @staticmethod 660 def _information_request_with_type(packet, info_type): 661 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.INFORMATION_REQUEST) 662 if frame is None: 663 return None 664 request = l2cap_packets.InformationRequestView(frame) 665 if request.GetInfoType() != info_type: 666 return None 667 return request 668 669 @staticmethod 670 def _information_response_with_type(packet, info_type): 671 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.INFORMATION_RESPONSE) 672 if frame is None: 673 return None 674 response = l2cap_packets.InformationResponseView(frame) 675 if response.GetInfoType() != info_type: 676 return None 677 return response 678 679 @staticmethod 680 def _is_matching_information_response_extended_features(packet, supports_ertm, supports_streaming, supports_fcs, 681 supports_fixed_channels): 682 frame = L2capMatchers._information_response_with_type(packet, 683 InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED) 684 if frame is None: 685 return False 686 features = l2cap_packets.InformationResponseExtendedFeaturesView(frame) 687 if supports_ertm is not None and features.GetEnhancedRetransmissionMode() != supports_ertm: 688 return False 689 if supports_streaming is not None and features.GetStreamingMode != supports_streaming: 690 return False 691 if supports_fcs is not None and features.GetFcsOption() != supports_fcs: 692 return False 693 if supports_fixed_channels is not None and features.GetFixedChannels() != supports_fixed_channels: 694 return False 695 return True 696 697 @staticmethod 698 def _is_matching_connection_parameter_update_response(packet, result): 699 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.CONNECTION_PARAMETER_UPDATE_RESPONSE) 700 if frame is None: 701 return False 702 return l2cap_packets.ConnectionParameterUpdateResponseView(frame).GetResult() == result 703 704 @staticmethod 705 def _is_matching_credit_based_connection_request(packet, psm): 706 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST) 707 if frame is None: 708 return False 709 request = l2cap_packets.LeCreditBasedConnectionRequestView(frame) 710 return request.GetLePsm() == psm 711 712 @staticmethod 713 def _is_matching_credit_based_connection_response(packet, result): 714 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_RESPONSE) 715 if frame is None: 716 return False 717 response = l2cap_packets.LeCreditBasedConnectionResponseView(frame) 718 return response.GetResult() == result and (result != LeCreditBasedConnectionResponseResult.SUCCESS or 719 response.GetDestinationCid() != 0) 720 721 @staticmethod 722 def LinkSecurityInterfaceCallbackEvent(type): 723 return lambda event: True if event.event_type == type else False 724 725 726class SecurityMatchers(object): 727 728 @staticmethod 729 def UiMsg(type, address=None): 730 return lambda event: True if event.message_type == type and (address == None or address == event.peer) else False 731 732 @staticmethod 733 def BondMsg(type, address=None, reason=None): 734 return lambda event: True if event.message_type == type and (address == None or address == event.peer) and (reason == None or reason == event.reason) else False 735 736 @staticmethod 737 def HelperMsg(type, address=None): 738 return lambda event: True if event.message_type == type and (address == None or address == event.peer) else False 739 740 741class IsoMatchers(object): 742 743 @staticmethod 744 def Data(payload): 745 return lambda packet: packet.payload == payload 746 747 @staticmethod 748 def PacketPayloadWithMatchingCisHandle(cis_handle): 749 return lambda packet: None if cis_handle != packet.handle else packet 750