1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import bluetooth_packets_python3 as bt_packets 18import logging 19import sys 20 21from bluetooth_packets_python3 import l2cap_packets 22from bluetooth_packets_python3.l2cap_packets import CommandCode, LeCommandCode 23from bluetooth_packets_python3.l2cap_packets import ConfigurationResponseResult 24from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult 25from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType 26from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult 27 28from blueberry.utils import bluetooth 29import hci_packets as hci 30 31 32class HciMatchers(object): 33 34 @staticmethod 35 def CommandComplete(opcode): 36 return lambda msg: HciMatchers._is_matching_command_complete(msg.payload, opcode) 37 38 @staticmethod 39 def ExtractMatchingCommandComplete(packet_bytes, opcode=None): 40 return HciMatchers._extract_matching_command_complete(packet_bytes, opcode) 41 42 @staticmethod 43 def _is_matching_command_complete(packet_bytes, opcode=None): 44 return HciMatchers._extract_matching_command_complete(packet_bytes, opcode) is not None 45 46 @staticmethod 47 def _extract_matching_command_complete(packet_bytes, opcode=None): 48 event = HciMatchers._extract_matching_event(packet_bytes, hci.EventCode.COMMAND_COMPLETE) 49 if not isinstance(event, hci.CommandComplete): 50 return None 51 if opcode and event.command_op_code != opcode: 52 return None 53 return event 54 55 @staticmethod 56 def CommandStatus(opcode=None): 57 return lambda msg: HciMatchers._is_matching_command_status(msg.payload, opcode) 58 59 @staticmethod 60 def ExtractMatchingCommandStatus(packet_bytes, opcode=None): 61 return HciMatchers._extract_matching_command_status(packet_bytes, opcode) 62 63 @staticmethod 64 def _is_matching_command_status(packet_bytes, opcode=None): 65 return HciMatchers._extract_matching_command_status(packet_bytes, opcode) is not None 66 67 @staticmethod 68 def _extract_matching_command_status(packet_bytes, opcode=None): 69 event = HciMatchers._extract_matching_event(packet_bytes, hci.EventCode.COMMAND_STATUS) 70 if not isinstance(event, hci.CommandStatus): 71 return None 72 if opcode and event.command_op_code != opcode: 73 return None 74 return event 75 76 @staticmethod 77 def EventWithCode(event_code): 78 return lambda msg: HciMatchers._is_matching_event(msg.payload, event_code) 79 80 @staticmethod 81 def ExtractEventWithCode(packet_bytes, event_code): 82 return HciMatchers._extract_matching_event(packet_bytes, event_code) 83 84 @staticmethod 85 def _is_matching_event(packet_bytes, event_code): 86 return HciMatchers._extract_matching_event(packet_bytes, event_code) is not None 87 88 @staticmethod 89 def _extract_matching_event(packet_bytes, event_code): 90 try: 91 event = hci.Event.parse_all(packet_bytes) 92 return event if event.event_code == event_code else None 93 except Exception as exn: 94 print(sys.stderr, f"Failed to parse incoming event: {exn}") 95 print(sys.stderr, f"Event data: {' '.join([f'{b:02x}' for b in packet_bytes])}") 96 return None 97 98 @staticmethod 99 def LeEventWithCode(subevent_code): 100 return lambda msg: HciMatchers._extract_matching_le_event(msg.payload, subevent_code) is not None 101 102 @staticmethod 103 def ExtractLeEventWithCode(packet_bytes, subevent_code): 104 return HciMatchers._extract_matching_le_event(packet_bytes, subevent_code) 105 106 @staticmethod 107 def _extract_matching_le_event(packet_bytes, subevent_code): 108 event = HciMatchers._extract_matching_event(packet_bytes, hci.EventCode.LE_META_EVENT) 109 if (not isinstance(event, hci.LeMetaEvent) or event.subevent_code != subevent_code): 110 return None 111 112 return event 113 114 @staticmethod 115 def LeAdvertisement(subevent_code=hci.SubeventCode.EXTENDED_ADVERTISING_REPORT, address=None, data=None): 116 return lambda msg: HciMatchers._extract_matching_le_advertisement(msg.payload, subevent_code, address, data 117 ) is not None 118 119 @staticmethod 120 def ExtractLeAdvertisement(packet_bytes, 121 subevent_code=hci.SubeventCode.EXTENDED_ADVERTISING_REPORT, 122 address=None, 123 data=None): 124 return HciMatchers._extract_matching_le_advertisement(packet_bytes, subevent_code, address, data) 125 126 @staticmethod 127 def _extract_matching_le_advertisement(packet_bytes, 128 subevent_code=hci.SubeventCode.EXTENDED_ADVERTISING_REPORT, 129 address=None, 130 data=None): 131 event = HciMatchers._extract_matching_le_event(packet_bytes, subevent_code) 132 if event is None: 133 return None 134 135 matched = False 136 for response in event.responses: 137 matched |= (address == None or response.address == bluetooth.Address(address)) and (data == None or 138 data in packet_bytes) 139 140 return event if matched else None 141 142 @staticmethod 143 def LeConnectionComplete(): 144 return lambda msg: HciMatchers._extract_le_connection_complete(msg.payload) is not None 145 146 @staticmethod 147 def ExtractLeConnectionComplete(packet_bytes): 148 return HciMatchers._extract_le_connection_complete(packet_bytes) 149 150 @staticmethod 151 def _extract_le_connection_complete(packet_bytes): 152 event = HciMatchers._extract_matching_le_event(packet_bytes, hci.SubeventCode.CONNECTION_COMPLETE) 153 if event is not None: 154 return event 155 156 return HciMatchers._extract_matching_le_event(packet_bytes, hci.SubeventCode.ENHANCED_CONNECTION_COMPLETE) 157 158 @staticmethod 159 def LogEventCode(): 160 return lambda event: logging.info("Received event: %x" % hci.Event.parse(event.payload).event_code) 161 162 @staticmethod 163 def LinkKeyRequest(): 164 return HciMatchers.EventWithCode(hci.EventCode.LINK_KEY_REQUEST) 165 166 @staticmethod 167 def IoCapabilityRequest(): 168 return HciMatchers.EventWithCode(hci.EventCode.IO_CAPABILITY_REQUEST) 169 170 @staticmethod 171 def IoCapabilityResponse(): 172 return HciMatchers.EventWithCode(hci.EventCode.IO_CAPABILITY_RESPONSE) 173 174 @staticmethod 175 def UserPasskeyNotification(): 176 return HciMatchers.EventWithCode(hci.EventCode.USER_PASSKEY_NOTIFICATION) 177 178 @staticmethod 179 def UserPasskeyRequest(): 180 return HciMatchers.EventWithCode(hci.EventCode.USER_PASSKEY_REQUEST) 181 182 @staticmethod 183 def UserConfirmationRequest(): 184 return HciMatchers.EventWithCode(hci.EventCode.USER_CONFIRMATION_REQUEST) 185 186 @staticmethod 187 def RemoteHostSupportedFeaturesNotification(): 188 return HciMatchers.EventWithCode(hci.EventCode.REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION) 189 190 @staticmethod 191 def LinkKeyNotification(): 192 return HciMatchers.EventWithCode(hci.EventCode.LINK_KEY_NOTIFICATION) 193 194 @staticmethod 195 def SimplePairingComplete(): 196 return HciMatchers.EventWithCode(hci.EventCode.SIMPLE_PAIRING_COMPLETE) 197 198 @staticmethod 199 def Disconnect(): 200 return HciMatchers.EventWithCode(hci.EventCode.DISCONNECT) 201 202 @staticmethod 203 def DisconnectionComplete(): 204 return HciMatchers.EventWithCode(hci.EventCode.DISCONNECTION_COMPLETE) 205 206 @staticmethod 207 def RemoteOobDataRequest(): 208 return HciMatchers.EventWithCode(hci.EventCode.REMOTE_OOB_DATA_REQUEST) 209 210 @staticmethod 211 def PinCodeRequest(): 212 return HciMatchers.EventWithCode(hci.EventCode.PIN_CODE_REQUEST) 213 214 @staticmethod 215 def LoopbackOf(packet): 216 return HciMatchers.Exactly(hci.LoopbackCommand(payload=packet)) 217 218 @staticmethod 219 def Exactly(packet): 220 data = bytes(packet.serialize()) 221 return lambda event: data == event.payload 222 223 224class AdvertisingMatchers(object): 225 226 @staticmethod 227 def AdvertisingCallbackMsg(type, advertiser_id=None, status=None, data=None): 228 return lambda event: True if event.message_type == type and (advertiser_id == None or advertiser_id == event.advertiser_id) \ 229 and (status == None or status == event.status) and (data == None or data == event.data) else False 230 231 @staticmethod 232 def AddressMsg(type, advertiser_id=None, address=None): 233 return lambda event: True if event.message_type == type and (advertiser_id == None or advertiser_id == event.advertiser_id) \ 234 and (address == None or address == event.address) else False 235 236 237class ScanningMatchers(object): 238 239 @staticmethod 240 def ScanningCallbackMsg(type, status=None, data=None): 241 return lambda event: True if event.message_type == type and (status == None or status == event.status) \ 242 and (data == None or data == event.data) else False 243 244 245class NeighborMatchers(object): 246 247 @staticmethod 248 def InquiryResult(address): 249 return lambda msg: NeighborMatchers._is_matching_inquiry_result(msg.packet, address) 250 251 @staticmethod 252 def _is_matching_inquiry_result(packet, address): 253 event = HciMatchers.ExtractEventWithCode(packet, hci.EventCode.INQUIRY_RESULT) 254 if not isinstance(event, hci.InquiryResult): 255 return False 256 return any((bluetooth.Address(address) == response.bd_addr for response in event.responses)) 257 258 @staticmethod 259 def InquiryResultwithRssi(address): 260 return lambda msg: NeighborMatchers._is_matching_inquiry_result_with_rssi(msg.packet, address) 261 262 @staticmethod 263 def _is_matching_inquiry_result_with_rssi(packet, address): 264 event = HciMatchers.ExtractEventWithCode(packet, hci.EventCode.INQUIRY_RESULT_WITH_RSSI) 265 if not isinstance(event, hci.InquiryResultWithRssi): 266 return False 267 return any((bluetooth.Address(address) == response.address for response in event.responses)) 268 269 @staticmethod 270 def ExtendedInquiryResult(address): 271 return lambda msg: NeighborMatchers._is_matching_extended_inquiry_result(msg.packet, address) 272 273 @staticmethod 274 def _is_matching_extended_inquiry_result(packet, address): 275 event = HciMatchers.ExtractEventWithCode(packet, hci.EventCode.EXTENDED_INQUIRY_RESULT) 276 if not isinstance(event, (hci.ExtendedInquiryResult, hci.ExtendedInquiryResultRaw)): 277 return False 278 return bluetooth.Address(address) == event.address 279 280 281class L2capMatchers(object): 282 283 @staticmethod 284 def ConnectionRequest(psm): 285 return lambda packet: L2capMatchers._is_matching_connection_request(packet, psm) 286 287 @staticmethod 288 def ConnectionResponse(scid): 289 return lambda packet: L2capMatchers._is_matching_connection_response(packet, scid) 290 291 @staticmethod 292 def ConfigurationResponse(result=ConfigurationResponseResult.SUCCESS): 293 return lambda packet: L2capMatchers._is_matching_configuration_response(packet, result) 294 295 @staticmethod 296 def ConfigurationRequest(cid=None): 297 return lambda packet: L2capMatchers._is_matching_configuration_request_with_cid(packet, cid) 298 299 @staticmethod 300 def ConfigurationRequestWithErtm(): 301 return lambda packet: L2capMatchers._is_matching_configuration_request_with_ertm(packet) 302 303 @staticmethod 304 def ConfigurationRequestView(dcid): 305 return lambda request_view: request_view.GetDestinationCid() == dcid 306 307 @staticmethod 308 def DisconnectionRequest(scid, dcid): 309 return lambda packet: L2capMatchers._is_matching_disconnection_request(packet, scid, dcid) 310 311 @staticmethod 312 def DisconnectionResponse(scid, dcid): 313 return lambda packet: L2capMatchers._is_matching_disconnection_response(packet, scid, dcid) 314 315 @staticmethod 316 def EchoResponse(): 317 return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.ECHO_RESPONSE) 318 319 @staticmethod 320 def CommandReject(): 321 return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT) 322 323 @staticmethod 324 def LeCommandReject(): 325 return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT) 326 327 @staticmethod 328 def LeConnectionParameterUpdateRequest(): 329 return lambda packet: L2capMatchers._is_le_control_frame_with_code( 330 packet, LeCommandCode.CONNECTION_PARAMETER_UPDATE_REQUEST) 331 332 @staticmethod 333 def LeConnectionParameterUpdateResponse(result=l2cap_packets.ConnectionParameterUpdateResponseResult.ACCEPTED): 334 return lambda packet: L2capMatchers._is_matching_connection_parameter_update_response(packet, result) 335 336 @staticmethod 337 def CreditBasedConnectionRequest(psm): 338 return lambda packet: L2capMatchers._is_matching_credit_based_connection_request(packet, psm) 339 340 @staticmethod 341 def CreditBasedConnectionResponse(result=LeCreditBasedConnectionResponseResult.SUCCESS): 342 return lambda packet: L2capMatchers._is_matching_credit_based_connection_response(packet, result) 343 344 @staticmethod 345 def CreditBasedConnectionResponseUsedCid(): 346 return lambda packet: L2capMatchers._is_matching_credit_based_connection_response( 347 packet, LeCreditBasedConnectionResponseResult.SOURCE_CID_ALREADY_ALLOCATED 348 ) or L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT) 349 350 @staticmethod 351 def LeDisconnectionRequest(scid, dcid): 352 return lambda packet: L2capMatchers._is_matching_le_disconnection_request(packet, scid, dcid) 353 354 @staticmethod 355 def LeDisconnectionResponse(scid, dcid): 356 return lambda packet: L2capMatchers._is_matching_le_disconnection_response(packet, scid, dcid) 357 358 @staticmethod 359 def LeFlowControlCredit(cid): 360 return lambda packet: L2capMatchers._is_matching_le_flow_control_credit(packet, cid) 361 362 @staticmethod 363 def SFrame(req_seq=None, f=None, s=None, p=None): 364 return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, req_seq, f, s, p) 365 366 @staticmethod 367 def IFrame(tx_seq=None, payload=None, f=None): 368 return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=False) 369 370 @staticmethod 371 def IFrameWithFcs(tx_seq=None, payload=None, f=None): 372 return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=True) 373 374 @staticmethod 375 def IFrameStart(tx_seq=None, payload=None, f=None): 376 return lambda packet: L2capMatchers._is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False) 377 378 @staticmethod 379 def Data(payload): 380 return lambda packet: packet.GetPayload().GetBytes() == payload 381 382 @staticmethod 383 def FirstLeIFrame(payload, sdu_size): 384 return lambda packet: L2capMatchers._is_matching_first_le_i_frame(packet, payload, sdu_size) 385 386 # this is a hack - should be removed 387 @staticmethod 388 def PartialData(payload): 389 return lambda packet: payload in packet.GetPayload().GetBytes() 390 391 # this is a hack - should be removed 392 @staticmethod 393 def PacketPayloadRawData(payload): 394 return lambda packet: payload in packet.payload 395 396 # this is a hack - should be removed 397 @staticmethod 398 def PacketPayloadWithMatchingPsm(psm): 399 return lambda packet: None if psm != packet.psm else packet 400 401 # this is a hack - should be removed 402 @staticmethod 403 def PacketPayloadWithMatchingCid(cid): 404 return lambda packet: None if cid != packet.fixed_cid else packet 405 406 @staticmethod 407 def ExtractBasicFrame(scid): 408 return lambda packet: L2capMatchers._basic_frame_for(packet, scid) 409 410 @staticmethod 411 def ExtractBasicFrameWithFcs(scid): 412 return lambda packet: L2capMatchers._basic_frame_with_fcs_for(packet, scid) 413 414 @staticmethod 415 def InformationRequestWithType(info_type): 416 return lambda packet: L2capMatchers._information_request_with_type(packet, info_type) 417 418 @staticmethod 419 def InformationResponseExtendedFeatures(supports_ertm=None, 420 supports_streaming=None, 421 supports_fcs=None, 422 supports_fixed_channels=None): 423 return lambda packet: L2capMatchers._is_matching_information_response_extended_features( 424 packet, supports_ertm, supports_streaming, supports_fcs, supports_fixed_channels) 425 426 @staticmethod 427 def _basic_frame(packet): 428 if packet is None: 429 return None 430 return l2cap_packets.BasicFrameView(bt_packets.PacketViewLittleEndian(list(packet.payload))) 431 432 @staticmethod 433 def _basic_frame_with_fcs(packet): 434 if packet is None: 435 return None 436 return l2cap_packets.BasicFrameWithFcsView(bt_packets.PacketViewLittleEndian(list(packet.payload))) 437 438 @staticmethod 439 def _basic_frame_for(packet, scid): 440 frame = L2capMatchers._basic_frame(packet) 441 if frame.GetChannelId() != scid: 442 return None 443 return frame 444 445 @staticmethod 446 def _basic_frame_with_fcs_for(packet, scid): 447 frame = L2capMatchers._basic_frame(packet) 448 if frame.GetChannelId() != scid: 449 return None 450 frame = L2capMatchers._basic_frame_with_fcs(packet) 451 if frame is None: 452 return None 453 return frame 454 455 @staticmethod 456 def _information_frame(packet): 457 standard_frame = l2cap_packets.StandardFrameView(packet) 458 if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME: 459 return None 460 return l2cap_packets.EnhancedInformationFrameView(standard_frame) 461 462 @staticmethod 463 def _information_frame_with_fcs(packet): 464 standard_frame = l2cap_packets.StandardFrameWithFcsView(packet) 465 if standard_frame is None: 466 return None 467 if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME: 468 return None 469 return l2cap_packets.EnhancedInformationFrameWithFcsView(standard_frame) 470 471 @staticmethod 472 def _information_start_frame(packet): 473 start_frame = L2capMatchers._information_frame(packet) 474 if start_frame is None: 475 return None 476 return l2cap_packets.EnhancedInformationStartFrameView(start_frame) 477 478 @staticmethod 479 def _information_start_frame_with_fcs(packet): 480 start_frame = L2capMatchers._information_frame_with_fcs(packet) 481 if start_frame is None: 482 return None 483 return l2cap_packets.EnhancedInformationStartFrameWithFcsView(start_frame) 484 485 @staticmethod 486 def _supervisory_frame(packet): 487 standard_frame = l2cap_packets.StandardFrameView(packet) 488 if standard_frame.GetFrameType() != l2cap_packets.FrameType.S_FRAME: 489 return None 490 return l2cap_packets.EnhancedSupervisoryFrameView(standard_frame) 491 492 @staticmethod 493 def _is_matching_information_frame(packet, tx_seq, payload, f, fcs=False): 494 if fcs: 495 frame = L2capMatchers._information_frame_with_fcs(packet) 496 else: 497 frame = L2capMatchers._information_frame(packet) 498 if frame is None: 499 return False 500 if tx_seq is not None and frame.GetTxSeq() != tx_seq: 501 return False 502 if payload is not None and frame.GetPayload().GetBytes() != payload: 503 return False 504 if f is not None and frame.GetF() != f: 505 return False 506 return True 507 508 @staticmethod 509 def _is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False): 510 if fcs: 511 frame = L2capMatchers._information_start_frame_with_fcs(packet) 512 else: 513 frame = L2capMatchers._information_start_frame(packet) 514 if frame is None: 515 return False 516 if tx_seq is not None and frame.GetTxSeq() != tx_seq: 517 return False 518 if payload is not None and frame.GetPayload().GetBytes() != payload: 519 return False 520 if f is not None and frame.GetF() != f: 521 return False 522 return True 523 524 @staticmethod 525 def _is_matching_supervisory_frame(packet, req_seq, f, s, p): 526 frame = L2capMatchers._supervisory_frame(packet) 527 if frame is None: 528 return False 529 if req_seq is not None and frame.GetReqSeq() != req_seq: 530 return False 531 if f is not None and frame.GetF() != f: 532 return False 533 if s is not None and frame.GetS() != s: 534 return False 535 if p is not None and frame.GetP() != p: 536 return False 537 return True 538 539 @staticmethod 540 def _is_matching_first_le_i_frame(packet, payload, sdu_size): 541 first_le_i_frame = l2cap_packets.FirstLeInformationFrameView(packet) 542 return first_le_i_frame.GetPayload().GetBytes() == payload and first_le_i_frame.GetL2capSduLength() == sdu_size 543 544 @staticmethod 545 def _control_frame(packet): 546 if packet.GetChannelId() != 1: 547 return None 548 return l2cap_packets.ControlView(packet.GetPayload()) 549 550 @staticmethod 551 def _le_control_frame(packet): 552 if packet.GetChannelId() != 5: 553 return None 554 return l2cap_packets.LeControlView(packet.GetPayload()) 555 556 @staticmethod 557 def control_frame_with_code(packet, code): 558 frame = L2capMatchers._control_frame(packet) 559 if frame is None or frame.GetCode() != code: 560 return None 561 return frame 562 563 @staticmethod 564 def le_control_frame_with_code(packet, code): 565 frame = L2capMatchers._le_control_frame(packet) 566 if frame is None or frame.GetCode() != code: 567 return None 568 return frame 569 570 @staticmethod 571 def _is_control_frame_with_code(packet, code): 572 return L2capMatchers.control_frame_with_code(packet, code) is not None 573 574 @staticmethod 575 def _is_le_control_frame_with_code(packet, code): 576 return L2capMatchers.le_control_frame_with_code(packet, code) is not None 577 578 @staticmethod 579 def _is_matching_connection_request(packet, psm): 580 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_REQUEST) 581 if frame is None: 582 return False 583 request = l2cap_packets.ConnectionRequestView(frame) 584 return request.GetPsm() == psm 585 586 @staticmethod 587 def _is_matching_connection_response(packet, scid): 588 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_RESPONSE) 589 if frame is None: 590 return False 591 response = l2cap_packets.ConnectionResponseView(frame) 592 return response.GetSourceCid() == scid and response.GetResult( 593 ) == ConnectionResponseResult.SUCCESS and response.GetDestinationCid() != 0 594 595 @staticmethod 596 def _is_matching_configuration_request_with_cid(packet, cid=None): 597 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST) 598 if frame is None: 599 return False 600 request = l2cap_packets.ConfigurationRequestView(frame) 601 dcid = request.GetDestinationCid() 602 return cid is None or cid == dcid 603 604 @staticmethod 605 def _is_matching_configuration_request_with_ertm(packet): 606 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST) 607 if frame is None: 608 return False 609 request = l2cap_packets.ConfigurationRequestView(frame) 610 config_bytes = request.GetBytes() 611 # TODO(b/153189503): Use packet struct parser. 612 return b"\x04\x09\x03" in config_bytes 613 614 @staticmethod 615 def _is_matching_configuration_response(packet, result=ConfigurationResponseResult.SUCCESS): 616 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_RESPONSE) 617 if frame is None: 618 return False 619 response = l2cap_packets.ConfigurationResponseView(frame) 620 return response.GetResult() == result 621 622 @staticmethod 623 def _is_matching_disconnection_request(packet, scid, dcid): 624 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.DISCONNECTION_REQUEST) 625 if frame is None: 626 return False 627 request = l2cap_packets.DisconnectionRequestView(frame) 628 return request.GetSourceCid() == scid and request.GetDestinationCid() == dcid 629 630 @staticmethod 631 def _is_matching_disconnection_response(packet, scid, dcid): 632 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.DISCONNECTION_RESPONSE) 633 if frame is None: 634 return False 635 response = l2cap_packets.DisconnectionResponseView(frame) 636 return response.GetSourceCid() == scid and response.GetDestinationCid() == dcid 637 638 @staticmethod 639 def _is_matching_le_disconnection_response(packet, scid, dcid): 640 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.DISCONNECTION_RESPONSE) 641 if frame is None: 642 return False 643 response = l2cap_packets.LeDisconnectionResponseView(frame) 644 return response.GetSourceCid() == scid and response.GetDestinationCid() == dcid 645 646 @staticmethod 647 def _is_matching_le_disconnection_request(packet, scid, dcid): 648 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.DISCONNECTION_REQUEST) 649 if frame is None: 650 return False 651 request = l2cap_packets.LeDisconnectionRequestView(frame) 652 return request.GetSourceCid() == scid and request.GetDestinationCid() == dcid 653 654 @staticmethod 655 def _is_matching_le_flow_control_credit(packet, cid): 656 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_FLOW_CONTROL_CREDIT) 657 if frame is None: 658 return False 659 request = l2cap_packets.LeFlowControlCreditView(frame) 660 return request.GetCid() == cid 661 662 @staticmethod 663 def _information_request_with_type(packet, info_type): 664 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.INFORMATION_REQUEST) 665 if frame is None: 666 return None 667 request = l2cap_packets.InformationRequestView(frame) 668 if request.GetInfoType() != info_type: 669 return None 670 return request 671 672 @staticmethod 673 def _information_response_with_type(packet, info_type): 674 frame = L2capMatchers.control_frame_with_code(packet, CommandCode.INFORMATION_RESPONSE) 675 if frame is None: 676 return None 677 response = l2cap_packets.InformationResponseView(frame) 678 if response.GetInfoType() != info_type: 679 return None 680 return response 681 682 @staticmethod 683 def _is_matching_information_response_extended_features(packet, supports_ertm, supports_streaming, supports_fcs, 684 supports_fixed_channels): 685 frame = L2capMatchers._information_response_with_type(packet, 686 InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED) 687 if frame is None: 688 return False 689 features = l2cap_packets.InformationResponseExtendedFeaturesView(frame) 690 if supports_ertm is not None and features.GetEnhancedRetransmissionMode() != supports_ertm: 691 return False 692 if supports_streaming is not None and features.GetStreamingMode != supports_streaming: 693 return False 694 if supports_fcs is not None and features.GetFcsOption() != supports_fcs: 695 return False 696 if supports_fixed_channels is not None and features.GetFixedChannels() != supports_fixed_channels: 697 return False 698 return True 699 700 @staticmethod 701 def _is_matching_connection_parameter_update_response(packet, result): 702 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.CONNECTION_PARAMETER_UPDATE_RESPONSE) 703 if frame is None: 704 return False 705 return l2cap_packets.ConnectionParameterUpdateResponseView(frame).GetResult() == result 706 707 @staticmethod 708 def _is_matching_credit_based_connection_request(packet, psm): 709 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST) 710 if frame is None: 711 return False 712 request = l2cap_packets.LeCreditBasedConnectionRequestView(frame) 713 return request.GetLePsm() == psm 714 715 @staticmethod 716 def _is_matching_credit_based_connection_response(packet, result): 717 frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_RESPONSE) 718 if frame is None: 719 return False 720 response = l2cap_packets.LeCreditBasedConnectionResponseView(frame) 721 return response.GetResult() == result and (result != LeCreditBasedConnectionResponseResult.SUCCESS or 722 response.GetDestinationCid() != 0) 723 724 @staticmethod 725 def LinkSecurityInterfaceCallbackEvent(type): 726 return lambda event: True if event.event_type == type else False 727 728 729class SecurityMatchers(object): 730 731 @staticmethod 732 def UiMsg(type, address=None): 733 return lambda event: True if event.message_type == type and (address == None or address == event.peer 734 ) else False 735 736 @staticmethod 737 def BondMsg(type, address=None, reason=None): 738 return lambda event: True if event.message_type == type and (address == None or address == event.peer) and ( 739 reason == None or reason == event.reason) else False 740 741 @staticmethod 742 def HelperMsg(type, address=None): 743 return lambda event: True if event.message_type == type and (address == None or address == event.peer 744 ) else False 745 746 747class IsoMatchers(object): 748 749 @staticmethod 750 def Data(payload): 751 return lambda packet: packet.payload == payload 752 753 @staticmethod 754 def PacketPayloadWithMatchingCisHandle(cis_handle): 755 return lambda packet: None if cis_handle != packet.handle else packet 756