1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18import bluetooth_packets_python3 as bt_packets 19import logging 20 21from bluetooth_packets_python3 import hci_packets 22from bluetooth_packets_python3.hci_packets import EventCode 23from bluetooth_packets_python3 import l2cap_packets 24from bluetooth_packets_python3.l2cap_packets import CommandCode, LeCommandCode 25from bluetooth_packets_python3.l2cap_packets import ConfigurationResponseResult 26from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult 27from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType 28from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult 29 30 31class HciMatchers(object): 32 33 @staticmethod 34 def CommandComplete(opcode): 35 return lambda msg: HciMatchers._is_matching_command_complete(msg.payload, opcode) 36 37 @staticmethod 38 def ExtractMatchingCommandComplete(packet_bytes, opcode=None): 39 return HciMatchers._extract_matching_command_complete(packet_bytes, opcode) 40 41 @staticmethod 42 def _is_matching_command_complete(packet_bytes, opcode=None): 43 return HciMatchers._extract_matching_command_complete(packet_bytes, opcode) is not None 44 45 @staticmethod 46 def _extract_matching_command_complete(packet_bytes, opcode=None): 47 event = HciMatchers._extract_matching_event(packet_bytes, EventCode.COMMAND_COMPLETE) 48 if event is None: 49 return None 50 complete = hci_packets.CommandCompleteView(event) 51 if opcode is None or complete is None: 52 return complete 53 else: 54 if complete.GetCommandOpCode() != opcode: 55 return None 56 else: 57 return complete 58 59 @staticmethod 60 def CommandStatus(opcode=None): 61 return lambda msg: HciMatchers._is_matching_command_status(msg.payload, opcode) 62 63 @staticmethod 64 def ExtractMatchingCommandStatus(packet_bytes, opcode=None): 65 return HciMatchers._extract_matching_command_complete(packet_bytes, opcode) 66 67 @staticmethod 68 def _is_matching_command_status(packet_bytes, opcode=None): 69 return HciMatchers._extract_matching_command_status(packet_bytes, opcode) is not None 70 71 @staticmethod 72 def _extract_matching_command_status(packet_bytes, opcode=None): 73 event = HciMatchers._extract_matching_event(packet_bytes, EventCode.COMMAND_STATUS) 74 if event is None: 75 return None 76 complete = hci_packets.CommandStatusView(event) 77 if opcode is None or complete is None: 78 return complete 79 else: 80 if complete.GetCommandOpCode() != opcode: 81 return None 82 else: 83 return complete 84 85 @staticmethod 86 def EventWithCode(event_code): 87 return lambda msg: HciMatchers._is_matching_event(msg.payload, event_code) 88 89 @staticmethod 90 def ExtractEventWithCode(packet_bytes, event_code): 91 return HciMatchers._extract_matching_event(packet_bytes, event_code) 92 93 @staticmethod 94 def _is_matching_event(packet_bytes, event_code): 95 return HciMatchers._extract_matching_event(packet_bytes, event_code) is not None 96 97 @staticmethod 98 def _extract_matching_event(packet_bytes, event_code): 99 event = hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes))) 100 if event is None: 101 return None 102 if event_code is not None and event.GetEventCode() != event_code: 103 return None 104 return event 105 106 @staticmethod 107 def LeEventWithCode(subevent_code): 108 return lambda msg: HciMatchers._extract_matching_le_event(msg.payload, subevent_code) is not None 109 110 @staticmethod 111 def ExtractLeEventWithCode(packet_bytes, subevent_code): 112 return HciMatchers._extract_matching_le_event(packet_bytes, subevent_code) 113 114 @staticmethod 115 def _extract_matching_le_event(packet_bytes, subevent_code): 116 inner_event = HciMatchers._extract_matching_event(packet_bytes, hci_packets.EventCode.LE_META_EVENT) 117 if inner_event is None: 118 return None 119 event = hci_packets.LeMetaEventView(inner_event) 120 if event.GetSubeventCode() != subevent_code: 121 return None 122 return event 123 124 @staticmethod 125 def LeConnectionComplete(): 126 return lambda msg: HciMatchers._extract_le_connection_complete(msg.payload) is not None 127 128 @staticmethod 129 def ExtractLeConnectionComplete(packet_bytes): 130 return HciMatchers._extract_le_connection_complete(packet_bytes) 131 132 @staticmethod 133 def _extract_le_connection_complete(packet_bytes): 134 inner_event = HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.CONNECTION_COMPLETE) 135 if inner_event is not None: 136 return hci_packets.LeConnectionCompleteView(inner_event) 137 138 inner_event = HciMatchers._extract_matching_le_event(packet_bytes, 139 hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE) 140 if inner_event is not None: 141 return hci_packets.LeEnhancedConnectionCompleteView(inner_event) 142 143 return None 144 145 @staticmethod 146 def LogEventCode(): 147 return lambda event: logging.info("Received event: %x" % hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(event.payload))).GetEventCode()) 148 149 @staticmethod 150 def LinkKeyRequest(): 151 return lambda event: HciMatchers.EventWithCode(EventCode.LINK_KEY_REQUEST) 152 153 @staticmethod 154 def IoCapabilityRequest(): 155 return lambda event: HciMatchers.EventWithCode(EventCode.IO_CAPABILITY_REQUEST) 156 157 @staticmethod 158 def IoCapabilityResponse(): 159 return lambda event: HciMatchers.EventWithCode(EventCode.IO_CAPABILITY_RESPONSE) 160 161 @staticmethod 162 def UserPasskeyNotification(): 163 return lambda event: HciMatchers.EventWithCode(EventCode.USER_PASSKEY_NOTIFICATION) 164 165 @staticmethod 166 def UserPasskeyRequest(): 167 return lambda event: HciMatchers.EventWithCode(EventCode.USER_PASSKEY_REQUEST) 168 169 @staticmethod 170 def UserConfirmationRequest(): 171 return lambda event: HciMatchers.EventWithCode(EventCode.USER_CONFIRMATION_REQUEST) 172 173 @staticmethod 174 def RemoteHostSupportedFeaturesNotification(): 175 return lambda event: HciMatchers.EventWithCode(EventCode.REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION) 176 177 @staticmethod 178 def LinkKeyNotification(): 179 return lambda event: HciMatchers.EventWithCode(EventCode.LINK_KEY_NOTIFICATION) 180 181 @staticmethod 182 def SimplePairingComplete(): 183 return lambda event: HciMatchers.EventWithCode(EventCode.SIMPLE_PAIRING_COMPLETE) 184 185 @staticmethod 186 def Disconnect(): 187 return lambda event: HciMatchers.EventWithCode(EventCode.DISCONNECT) 188 189 @staticmethod 190 def DisconnectionComplete(): 191 return lambda event: HciMatchers.EventWithCode(EventCode.DISCONNECTION_COMPLETE) 192 193 @staticmethod 194 def RemoteOobDataRequest(): 195 return lambda event: HciMatchers.EventWithCode(EventCode.REMOTE_OOB_DATA_REQUEST) 196 197 @staticmethod 198 def PinCodeRequest(): 199 return lambda event: HciMatchers.EventWithCode(EventCode.PIN_CODE_REQUEST) 200 201 @staticmethod 202 def LoopbackOf(packet): 203 return HciMatchers.Exactly(hci_packets.LoopbackCommandBuilder(packet)) 204 205 @staticmethod 206 def Exactly(packet): 207 data = bytes(packet.Serialize()) 208 return lambda event: data == event.payload 209 210 211class NeighborMatchers(object): 212 213 @staticmethod 214 def InquiryResult(address): 215 return lambda msg: NeighborMatchers._is_matching_inquiry_result(msg.packet, address) 216 217 @staticmethod 218 def _is_matching_inquiry_result(packet, address): 219 hci_event = HciMatchers.ExtractEventWithCode(packet, EventCode.INQUIRY_RESULT) 220 if hci_event is None: 221 return False 222 inquiry_view = hci_packets.InquiryResultView(hci_event) 223 if inquiry_view is None: 224 return False 225 results = inquiry_view.GetInquiryResults() 226 return any((address == result.bd_addr for result in results)) 227 228 @staticmethod 229 def InquiryResultwithRssi(address): 230 return lambda msg: NeighborMatchers._is_matching_inquiry_result_with_rssi(msg.packet, address) 231 232 @staticmethod 233 def _is_matching_inquiry_result_with_rssi(packet, address): 234 hci_event = HciMatchers.ExtractEventWithCode(packet, EventCode.INQUIRY_RESULT_WITH_RSSI) 235 if hci_event is None: 236 return False 237 inquiry_view = hci_packets.InquiryResultWithRssiView(hci_event) 238 if inquiry_view is None: 239 return False 240 results = inquiry_view.GetInquiryResults() 241 return any((address == result.address for result in results)) 242 243 @staticmethod 244 def ExtendedInquiryResult(address): 245 return lambda msg: NeighborMatchers._is_matching_extended_inquiry_result(msg.packet, address) 246 247 @staticmethod 248 def _is_matching_extended_inquiry_result(packet, address): 249 hci_event = HciMatchers.ExtractEventWithCode(packet, EventCode.EXTENDED_INQUIRY_RESULT) 250 if hci_event is None: 251 return False 252 extended_view = hci_packets.ExtendedInquiryResultView(hci_event) 253 if extended_view is None: 254 return False 255 return address == extended_view.GetAddress() 256 257 258class L2capMatchers(object): 259 260 @staticmethod 261 def ConnectionRequest(psm): 262 return lambda packet: L2capMatchers._is_matching_connection_request(packet, psm) 263 264 @staticmethod 265 def ConnectionResponse(scid): 266 return lambda packet: L2capMatchers._is_matching_connection_response(packet, scid) 267 268 @staticmethod 269 def ConfigurationResponse(result=ConfigurationResponseResult.SUCCESS): 270 return lambda packet: L2capMatchers._is_matching_configuration_response(packet, result) 271 272 @staticmethod 273 def ConfigurationRequest(cid=None): 274 return lambda packet: L2capMatchers._is_matching_configuration_request_with_cid(packet, cid) 275 276 @staticmethod 277 def ConfigurationRequestWithErtm(): 278 return lambda packet: L2capMatchers._is_matching_configuration_request_with_ertm(packet) 279 280 @staticmethod 281 def ConfigurationRequestView(dcid): 282 return lambda request_view: request_view.GetDestinationCid() == dcid 283 284 @staticmethod 285 def DisconnectionRequest(scid, dcid): 286 return lambda packet: L2capMatchers._is_matching_disconnection_request(packet, scid, dcid) 287 288 @staticmethod 289 def DisconnectionResponse(scid, dcid): 290 return lambda packet: L2capMatchers._is_matching_disconnection_response(packet, scid, dcid) 291 292 @staticmethod 293 def EchoResponse(): 294 return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.ECHO_RESPONSE) 295 296 @staticmethod 297 def CommandReject(): 298 return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT) 299 300 @staticmethod 301 def LeCommandReject(): 302 return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT) 303 304 @staticmethod 305 def LeConnectionParameterUpdateRequest(): 306 return lambda packet: L2capMatchers._is_le_control_frame_with_code( 307 packet, LeCommandCode.CONNECTION_PARAMETER_UPDATE_REQUEST) 308 309 @staticmethod 310 def LeConnectionParameterUpdateResponse(result=l2cap_packets.ConnectionParameterUpdateResponseResult.ACCEPTED): 311 return lambda packet: L2capMatchers._is_matching_connection_parameter_update_response(packet, result) 312 313 @staticmethod 314 def CreditBasedConnectionRequest(psm): 315 return lambda packet: L2capMatchers._is_matching_credit_based_connection_request(packet, psm) 316 317 @staticmethod 318 def CreditBasedConnectionResponse(result=LeCreditBasedConnectionResponseResult.SUCCESS): 319 return lambda packet: L2capMatchers._is_matching_credit_based_connection_response(packet, result) 320 321 @staticmethod 322 def CreditBasedConnectionResponseUsedCid(): 323 return lambda packet: L2capMatchers._is_matching_credit_based_connection_response( 324 packet, LeCreditBasedConnectionResponseResult.SOURCE_CID_ALREADY_ALLOCATED 325 ) or L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT) 326 327 @staticmethod 328 def LeDisconnectionRequest(scid, dcid): 329 return lambda packet: L2capMatchers._is_matching_le_disconnection_request(packet, scid, dcid) 330 331 @staticmethod 332 def LeDisconnectionResponse(scid, dcid): 333 return lambda packet: L2capMatchers._is_matching_le_disconnection_response(packet, scid, dcid) 334 335 @staticmethod 336 def LeFlowControlCredit(cid): 337 return lambda packet: L2capMatchers._is_matching_le_flow_control_credit(packet, cid) 338 339 @staticmethod 340 def SFrame(req_seq=None, f=None, s=None, p=None): 341 return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, req_seq, f, s, p) 342 343 @staticmethod 344 def IFrame(tx_seq=None, payload=None, f=None): 345 return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=False) 346 347 @staticmethod 348 def IFrameWithFcs(tx_seq=None, payload=None, f=None): 349 return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=True) 350 351 @staticmethod 352 def IFrameStart(tx_seq=None, payload=None, f=None): 353 return lambda packet: L2capMatchers._is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False) 354 355 @staticmethod 356 def Data(payload): 357 return lambda packet: packet.GetPayload().GetBytes() == payload 358 359 @staticmethod 360 def FirstLeIFrame(payload, sdu_size): 361 return lambda packet: L2capMatchers._is_matching_first_le_i_frame(packet, payload, sdu_size) 362 363 # this is a hack - should be removed 364 @staticmethod 365 def PartialData(payload): 366 return lambda packet: payload in packet.GetPayload().GetBytes() 367 368 # this is a hack - should be removed 369 @staticmethod 370 def PacketPayloadRawData(payload): 371 return lambda packet: payload in packet.payload 372 373 # this is a hack - should be removed 374 @staticmethod 375 def PacketPayloadWithMatchingPsm(psm): 376 return lambda packet: None if psm != packet.psm else packet 377 378 # this is a hack - should be removed 379 @staticmethod 380 def PacketPayloadWithMatchingCid(cid): 381 return lambda packet: None if cid != packet.fixed_cid else packet 382 383 @staticmethod 384 def ExtractBasicFrame(scid): 385 return lambda packet: L2capMatchers._basic_frame_for(packet, scid) 386 387 @staticmethod 388 def ExtractBasicFrameWithFcs(scid): 389 return lambda packet: L2capMatchers._basic_frame_with_fcs_for(packet, scid) 390 391 @staticmethod 392 def InformationRequestWithType(info_type): 393 return lambda packet: L2capMatchers._information_request_with_type(packet, info_type) 394 395 @staticmethod 396 def InformationResponseExtendedFeatures(supports_ertm=None, 397 supports_streaming=None, 398 supports_fcs=None, 399 supports_fixed_channels=None): 400 return lambda packet: L2capMatchers._is_matching_information_response_extended_features( 401 packet, supports_ertm, supports_streaming, supports_fcs, supports_fixed_channels) 402 403 @staticmethod 404 def _basic_frame(packet): 405 if packet is None: 406 return None 407 return l2cap_packets.BasicFrameView(bt_packets.PacketViewLittleEndian(list(packet.payload))) 408 409 @staticmethod 410 def _basic_frame_with_fcs(packet): 411 if packet is None: 412 return None 413 return l2cap_packets.BasicFrameWithFcsView(bt_packets.PacketViewLittleEndian(list(packet.payload))) 414 415 @staticmethod 416 def _basic_frame_for(packet, scid): 417 frame = L2capMatchers._basic_frame(packet) 418 if frame.GetChannelId() != scid: 419 return None 420 return frame 421 422 @staticmethod 423 def _basic_frame_with_fcs_for(packet, scid): 424 frame = L2capMatchers._basic_frame(packet) 425 if frame.GetChannelId() != scid: 426 return None 427 frame = L2capMatchers._basic_frame_with_fcs(packet) 428 if frame is None: 429 return None 430 return frame 431 432 @staticmethod 433 def _information_frame(packet): 434 standard_frame = l2cap_packets.StandardFrameView(packet) 435 if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME: 436 return None 437 return l2cap_packets.EnhancedInformationFrameView(standard_frame) 438 439 @staticmethod 440 def _information_frame_with_fcs(packet): 441 standard_frame = l2cap_packets.StandardFrameWithFcsView(packet) 442 if standard_frame is None: 443 return None 444 if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME: 445 return None 446 return l2cap_packets.EnhancedInformationFrameWithFcsView(standard_frame) 447 448 @staticmethod 449 def _information_start_frame(packet): 450 start_frame = L2capMatchers._information_frame(packet) 451 if start_frame is None: 452 return None 453 return l2cap_packets.EnhancedInformationStartFrameView(start_frame) 454 455 @staticmethod 456 def _information_start_frame_with_fcs(packet): 457 start_frame = L2capMatchers._information_frame_with_fcs(packet) 458 if start_frame is None: 459 return None 460 return l2cap_packets.EnhancedInformationStartFrameWithFcsView(start_frame) 461 462 @staticmethod 463 def _supervisory_frame(packet): 464 standard_frame = l2cap_packets.StandardFrameView(packet) 465 if standard_frame.GetFrameType() != l2cap_packets.FrameType.S_FRAME: 466 return None 467 return l2cap_packets.EnhancedSupervisoryFrameView(standard_frame) 468 469 @staticmethod 470 def _is_matching_information_frame(packet, tx_seq, payload, f, fcs=False): 471 if fcs: 472 frame = L2capMatchers._information_frame_with_fcs(packet) 473 else: 474 frame = L2capMatchers._information_frame(packet) 475 if frame is None: 476 return False 477 if tx_seq is not None and frame.GetTxSeq() != tx_seq: 478 return False 479 if payload is not None and frame.GetPayload().GetBytes() != payload: 480 return False 481 if f is not None and frame.GetF() != f: 482 return False 483 return True 484 485 @staticmethod 486 def _is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False): 487 if fcs: 488 frame = L2capMatchers._information_start_frame_with_fcs(packet) 489 else: 490 frame = L2capMatchers._information_start_frame(packet) 491 if frame is None: 492 return False 493 if tx_seq is not None and frame.GetTxSeq() != tx_seq: 494 return False 495 if payload is not None and frame.GetPayload().GetBytes() != payload: 496 return False 497 if f is not None and frame.GetF() != f: 498 return False 499 return True 500 501 @staticmethod 502 def _is_matching_supervisory_frame(packet, req_seq, f, s, p): 503 frame = L2capMatchers._supervisory_frame(packet) 504 if frame is None: 505 return False 506 if req_seq is not None and frame.GetReqSeq() != req_seq: 507 return False 508 if f is not None and frame.GetF() != f: 509 return False 510 if s is not None and frame.GetS() != s: 511 return False 512 if p is not None and frame.GetP() != p: 513 return False 514 return True 515 516 @staticmethod 517 def _is_matching_first_le_i_frame(packet, payload, sdu_size): 518 first_le_i_frame = l2cap_packets.FirstLeInformationFrameView(packet) 519 return first_le_i_frame.GetPayload().GetBytes() == payload and first_le_i_frame.GetL2capSduLength() == sdu_size 520 521 @staticmethod 522 def _control_frame(packet): 523 if packet.GetChannelId() != 1: 524 return None 525 return l2cap_packets.ControlView(packet.GetPayload()) 526 527 @staticmethod 528 def _le_control_frame(packet): 529 if packet.GetChannelId() != 5: 530 return None 531 return l2cap_packets.LeControlView(packet.GetPayload()) 532 533 @staticmethod 534 def control_frame_with_code(packet, code): 535 frame = L2capMatchers._control_frame(packet) 536 if frame is None or frame.GetCode() != code: 537 return None 538 return frame 539 540 @staticmethod 541 def le_control_frame_with_code(packet, code): 542 frame = L2capMatchers._le_control_frame(packet) 543 if frame is None or frame.GetCode() != code: 544 return None 545 return frame 546 547 @staticmethod 548 def _is_control_frame_with_code(packet, code): 549 return L2capMatchers.control_frame_with_code(packet, code) is not None 550 551 @staticmethod 552 def _is_le_control_frame_with_code(packet, code): 553 return L2capMatchers.le_control_frame_with_code(packet, code) is not None 554 555 @staticmethod 556 def _is_matching_connection_request(packet, psm): 557 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_REQUEST) 558 if frame is None: 559 return False 560 request = l2cap_packets.ConnectionRequestView(frame) 561 return request.GetPsm() == psm 562 563 @staticmethod 564 def _is_matching_connection_response(packet, scid): 565 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_RESPONSE) 566 if frame is None: 567 return False 568 response = l2cap_packets.ConnectionResponseView(frame) 569 return response.GetSourceCid() == scid and response.GetResult( 570 ) == ConnectionResponseResult.SUCCESS and response.GetDestinationCid() != 0 571 572 @staticmethod 573 def _is_matching_configuration_request_with_cid(packet, cid=None): 574 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST) 575 if frame is None: 576 return False 577 request = l2cap_packets.ConfigurationRequestView(frame) 578 dcid = request.GetDestinationCid() 579 return cid is None or cid == dcid 580 581 @staticmethod 582 def _is_matching_configuration_request_with_ertm(packet): 583 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST) 584 if frame is None: 585 return False 586 request = l2cap_packets.ConfigurationRequestView(frame) 587 config_bytes = request.GetBytes() 588 # TODO(b/153189503): Use packet struct parser. 589 return b"\x04\x09\x03" in config_bytes 590 591 @staticmethod 592 def _is_matching_configuration_response(packet, result=ConfigurationResponseResult.SUCCESS): 593 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_RESPONSE) 594 if frame is None: 595 return False 596 response = l2cap_packets.ConfigurationResponseView(frame) 597 return response.GetResult() == result 598 599 @staticmethod 600 def _is_matching_disconnection_request(packet, scid, dcid): 601 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.DISCONNECTION_REQUEST) 602 if frame is None: 603 return False 604 request = l2cap_packets.DisconnectionRequestView(frame) 605 return request.GetSourceCid() == scid and request.GetDestinationCid() == dcid 606 607 @staticmethod 608 def _is_matching_disconnection_response(packet, scid, dcid): 609 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.DISCONNECTION_RESPONSE) 610 if frame is None: 611 return False 612 response = l2cap_packets.DisconnectionResponseView(frame) 613 return response.GetSourceCid() == scid and response.GetDestinationCid() == dcid 614 615 @staticmethod 616 def _is_matching_le_disconnection_response(packet, scid, dcid): 617 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.DISCONNECTION_RESPONSE) 618 if frame is None: 619 return False 620 response = l2cap_packets.LeDisconnectionResponseView(frame) 621 return response.GetSourceCid() == scid and response.GetDestinationCid() == dcid 622 623 @staticmethod 624 def _is_matching_le_disconnection_request(packet, scid, dcid): 625 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.DISCONNECTION_REQUEST) 626 if frame is None: 627 return False 628 request = l2cap_packets.LeDisconnectionRequestView(frame) 629 return request.GetSourceCid() == scid and request.GetDestinationCid() == dcid 630 631 @staticmethod 632 def _is_matching_le_flow_control_credit(packet, cid): 633 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_FLOW_CONTROL_CREDIT) 634 if frame is None: 635 return False 636 request = l2cap_packets.LeFlowControlCreditView(frame) 637 return request.GetCid() == cid 638 639 @staticmethod 640 def _information_request_with_type(packet, info_type): 641 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.INFORMATION_REQUEST) 642 if frame is None: 643 return None 644 request = l2cap_packets.InformationRequestView(frame) 645 if request.GetInfoType() != info_type: 646 return None 647 return request 648 649 @staticmethod 650 def _information_response_with_type(packet, info_type): 651 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.INFORMATION_RESPONSE) 652 if frame is None: 653 return None 654 response = l2cap_packets.InformationResponseView(frame) 655 if response.GetInfoType() != info_type: 656 return None 657 return response 658 659 @staticmethod 660 def _is_matching_information_response_extended_features(packet, supports_ertm, supports_streaming, supports_fcs, 661 supports_fixed_channels): 662 frame = L2capMatchers._information_response_with_type(packet, 663 InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED) 664 if frame is None: 665 return False 666 features = l2cap_packets.InformationResponseExtendedFeaturesView(frame) 667 if supports_ertm is not None and features.GetEnhancedRetransmissionMode() != supports_ertm: 668 return False 669 if supports_streaming is not None and features.GetStreamingMode != supports_streaming: 670 return False 671 if supports_fcs is not None and features.GetFcsOption() != supports_fcs: 672 return False 673 if supports_fixed_channels is not None and features.GetFixedChannels() != supports_fixed_channels: 674 return False 675 return True 676 677 @staticmethod 678 def _is_matching_connection_parameter_update_response(packet, result): 679 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.CONNECTION_PARAMETER_UPDATE_RESPONSE) 680 if frame is None: 681 return False 682 return l2cap_packets.ConnectionParameterUpdateResponseView(frame).GetResult() == result 683 684 @staticmethod 685 def _is_matching_credit_based_connection_request(packet, psm): 686 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST) 687 if frame is None: 688 return False 689 request = l2cap_packets.LeCreditBasedConnectionRequestView(frame) 690 return request.GetLePsm() == psm 691 692 @staticmethod 693 def _is_matching_credit_based_connection_response(packet, result): 694 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_RESPONSE) 695 if frame is None: 696 return False 697 response = l2cap_packets.LeCreditBasedConnectionResponseView(frame) 698 return response.GetResult() == result and (result != LeCreditBasedConnectionResponseResult.SUCCESS or 699 response.GetDestinationCid() != 0) 700 701 @staticmethod 702 def LinkSecurityInterfaceCallbackEvent(type): 703 return lambda event: True if event.event_type == type else False 704 705 706class SecurityMatchers(object): 707 708 @staticmethod 709 def UiMsg(type, address=None): 710 return lambda event: True if event.message_type == type and (address == None or address == event.peer) else False 711 712 @staticmethod 713 def BondMsg(type, address=None, reason=None): 714 return lambda event: True if event.message_type == type and (address == None or address == event.peer) and (reason == None or reason == event.reason) else False 715 716 @staticmethod 717 def HelperMsg(type, address=None): 718 return lambda event: True if event.message_type == type and (address == None or address == event.peer) else False 719 720 721class IsoMatchers(object): 722 723 @staticmethod 724 def Data(payload): 725 return lambda packet: packet.payload == payload 726 727 @staticmethod 728 def PacketPayloadWithMatchingCisHandle(cis_handle): 729 return lambda packet: None if cis_handle != packet.handle else packet 730