• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import bluetooth_packets_python3 as bt_packets
18import logging
19import sys
20
21from bluetooth_packets_python3 import l2cap_packets
22from bluetooth_packets_python3.l2cap_packets import CommandCode, LeCommandCode
23from bluetooth_packets_python3.l2cap_packets import ConfigurationResponseResult
24from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult
25from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType
26from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult
27
28from blueberry.utils import bluetooth
29import hci_packets as hci
30
31
32class HciMatchers(object):
33
34    @staticmethod
35    def CommandComplete(opcode):
36        return lambda msg: HciMatchers._is_matching_command_complete(msg.payload, opcode)
37
38    @staticmethod
39    def ExtractMatchingCommandComplete(packet_bytes, opcode=None):
40        return HciMatchers._extract_matching_command_complete(packet_bytes, opcode)
41
42    @staticmethod
43    def _is_matching_command_complete(packet_bytes, opcode=None):
44        return HciMatchers._extract_matching_command_complete(packet_bytes, opcode) is not None
45
46    @staticmethod
47    def _extract_matching_command_complete(packet_bytes, opcode=None):
48        event = HciMatchers._extract_matching_event(packet_bytes, hci.EventCode.COMMAND_COMPLETE)
49        if not isinstance(event, hci.CommandComplete):
50            return None
51        if opcode and event.command_op_code != opcode:
52            return None
53        return event
54
55    @staticmethod
56    def CommandStatus(opcode=None):
57        return lambda msg: HciMatchers._is_matching_command_status(msg.payload, opcode)
58
59    @staticmethod
60    def ExtractMatchingCommandStatus(packet_bytes, opcode=None):
61        return HciMatchers._extract_matching_command_status(packet_bytes, opcode)
62
63    @staticmethod
64    def _is_matching_command_status(packet_bytes, opcode=None):
65        return HciMatchers._extract_matching_command_status(packet_bytes, opcode) is not None
66
67    @staticmethod
68    def _extract_matching_command_status(packet_bytes, opcode=None):
69        event = HciMatchers._extract_matching_event(packet_bytes, hci.EventCode.COMMAND_STATUS)
70        if not isinstance(event, hci.CommandStatus):
71            return None
72        if opcode and event.command_op_code != opcode:
73            return None
74        return event
75
76    @staticmethod
77    def EventWithCode(event_code):
78        return lambda msg: HciMatchers._is_matching_event(msg.payload, event_code)
79
80    @staticmethod
81    def ExtractEventWithCode(packet_bytes, event_code):
82        return HciMatchers._extract_matching_event(packet_bytes, event_code)
83
84    @staticmethod
85    def _is_matching_event(packet_bytes, event_code):
86        return HciMatchers._extract_matching_event(packet_bytes, event_code) is not None
87
88    @staticmethod
89    def _extract_matching_event(packet_bytes, event_code):
90        try:
91            event = hci.Event.parse_all(packet_bytes)
92            return event if event.event_code == event_code else None
93        except Exception as exn:
94            print(sys.stderr, f"Failed to parse incoming event: {exn}")
95            print(sys.stderr, f"Event data: {' '.join([f'{b:02x}' for b in packet_bytes])}")
96            return None
97
98    @staticmethod
99    def LeEventWithCode(subevent_code):
100        return lambda msg: HciMatchers._extract_matching_le_event(msg.payload, subevent_code) is not None
101
102    @staticmethod
103    def ExtractLeEventWithCode(packet_bytes, subevent_code):
104        return HciMatchers._extract_matching_le_event(packet_bytes, subevent_code)
105
106    @staticmethod
107    def _extract_matching_le_event(packet_bytes, subevent_code):
108        event = HciMatchers._extract_matching_event(packet_bytes, hci.EventCode.LE_META_EVENT)
109        if (not isinstance(event, hci.LeMetaEvent) or event.subevent_code != subevent_code):
110            return None
111
112        return event
113
114    @staticmethod
115    def LeAdvertisement(subevent_code=hci.SubeventCode.EXTENDED_ADVERTISING_REPORT, address=None, data=None):
116        return lambda msg: HciMatchers._extract_matching_le_advertisement(msg.payload, subevent_code, address, data
117                                                                         ) is not None
118
119    @staticmethod
120    def ExtractLeAdvertisement(packet_bytes,
121                               subevent_code=hci.SubeventCode.EXTENDED_ADVERTISING_REPORT,
122                               address=None,
123                               data=None):
124        return HciMatchers._extract_matching_le_advertisement(packet_bytes, subevent_code, address, data)
125
126    @staticmethod
127    def _extract_matching_le_advertisement(packet_bytes,
128                                           subevent_code=hci.SubeventCode.EXTENDED_ADVERTISING_REPORT,
129                                           address=None,
130                                           data=None):
131        event = HciMatchers._extract_matching_le_event(packet_bytes, subevent_code)
132        if event is None:
133            return None
134
135        matched = False
136        for response in event.responses:
137            matched |= (address == None or response.address == bluetooth.Address(address)) and (data == None or
138                                                                                                data in packet_bytes)
139
140        return event if matched else None
141
142    @staticmethod
143    def LeConnectionComplete():
144        return lambda msg: HciMatchers._extract_le_connection_complete(msg.payload) is not None
145
146    @staticmethod
147    def ExtractLeConnectionComplete(packet_bytes):
148        return HciMatchers._extract_le_connection_complete(packet_bytes)
149
150    @staticmethod
151    def _extract_le_connection_complete(packet_bytes):
152        event = HciMatchers._extract_matching_le_event(packet_bytes, hci.SubeventCode.CONNECTION_COMPLETE)
153        if event is not None:
154            return event
155
156        return HciMatchers._extract_matching_le_event(packet_bytes, hci.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
157
158    @staticmethod
159    def LogEventCode():
160        return lambda event: logging.info("Received event: %x" % hci.Event.parse(event.payload).event_code)
161
162    @staticmethod
163    def LinkKeyRequest():
164        return HciMatchers.EventWithCode(hci.EventCode.LINK_KEY_REQUEST)
165
166    @staticmethod
167    def IoCapabilityRequest():
168        return HciMatchers.EventWithCode(hci.EventCode.IO_CAPABILITY_REQUEST)
169
170    @staticmethod
171    def IoCapabilityResponse():
172        return HciMatchers.EventWithCode(hci.EventCode.IO_CAPABILITY_RESPONSE)
173
174    @staticmethod
175    def UserPasskeyNotification():
176        return HciMatchers.EventWithCode(hci.EventCode.USER_PASSKEY_NOTIFICATION)
177
178    @staticmethod
179    def UserPasskeyRequest():
180        return HciMatchers.EventWithCode(hci.EventCode.USER_PASSKEY_REQUEST)
181
182    @staticmethod
183    def UserConfirmationRequest():
184        return HciMatchers.EventWithCode(hci.EventCode.USER_CONFIRMATION_REQUEST)
185
186    @staticmethod
187    def RemoteHostSupportedFeaturesNotification():
188        return HciMatchers.EventWithCode(hci.EventCode.REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION)
189
190    @staticmethod
191    def LinkKeyNotification():
192        return HciMatchers.EventWithCode(hci.EventCode.LINK_KEY_NOTIFICATION)
193
194    @staticmethod
195    def SimplePairingComplete():
196        return HciMatchers.EventWithCode(hci.EventCode.SIMPLE_PAIRING_COMPLETE)
197
198    @staticmethod
199    def Disconnect():
200        return HciMatchers.EventWithCode(hci.EventCode.DISCONNECT)
201
202    @staticmethod
203    def DisconnectionComplete():
204        return HciMatchers.EventWithCode(hci.EventCode.DISCONNECTION_COMPLETE)
205
206    @staticmethod
207    def RemoteOobDataRequest():
208        return HciMatchers.EventWithCode(hci.EventCode.REMOTE_OOB_DATA_REQUEST)
209
210    @staticmethod
211    def PinCodeRequest():
212        return HciMatchers.EventWithCode(hci.EventCode.PIN_CODE_REQUEST)
213
214    @staticmethod
215    def LoopbackOf(packet):
216        return HciMatchers.Exactly(hci.LoopbackCommand(payload=packet))
217
218    @staticmethod
219    def Exactly(packet):
220        data = bytes(packet.serialize())
221        return lambda event: data == event.payload
222
223
224class AdvertisingMatchers(object):
225
226    @staticmethod
227    def AdvertisingCallbackMsg(type, advertiser_id=None, status=None, data=None):
228        return lambda event: True if event.message_type == type and (advertiser_id == None or advertiser_id == event.advertiser_id) \
229            and (status == None or status == event.status) and (data == None or data == event.data) else False
230
231    @staticmethod
232    def AddressMsg(type, advertiser_id=None, address=None):
233        return lambda event: True if event.message_type == type and (advertiser_id == None or advertiser_id == event.advertiser_id) \
234            and (address == None or address == event.address) else False
235
236
237class ScanningMatchers(object):
238
239    @staticmethod
240    def ScanningCallbackMsg(type, status=None, data=None):
241        return lambda event: True if event.message_type == type and (status == None or status == event.status) \
242            and (data == None or data == event.data) else False
243
244
245class NeighborMatchers(object):
246
247    @staticmethod
248    def InquiryResult(address):
249        return lambda msg: NeighborMatchers._is_matching_inquiry_result(msg.packet, address)
250
251    @staticmethod
252    def _is_matching_inquiry_result(packet, address):
253        event = HciMatchers.ExtractEventWithCode(packet, hci.EventCode.INQUIRY_RESULT)
254        if not isinstance(event, hci.InquiryResult):
255            return False
256        return any((bluetooth.Address(address) == response.bd_addr for response in event.responses))
257
258    @staticmethod
259    def InquiryResultwithRssi(address):
260        return lambda msg: NeighborMatchers._is_matching_inquiry_result_with_rssi(msg.packet, address)
261
262    @staticmethod
263    def _is_matching_inquiry_result_with_rssi(packet, address):
264        event = HciMatchers.ExtractEventWithCode(packet, hci.EventCode.INQUIRY_RESULT_WITH_RSSI)
265        if not isinstance(event, hci.InquiryResultWithRssi):
266            return False
267        return any((bluetooth.Address(address) == response.address for response in event.responses))
268
269    @staticmethod
270    def ExtendedInquiryResult(address):
271        return lambda msg: NeighborMatchers._is_matching_extended_inquiry_result(msg.packet, address)
272
273    @staticmethod
274    def _is_matching_extended_inquiry_result(packet, address):
275        event = HciMatchers.ExtractEventWithCode(packet, hci.EventCode.EXTENDED_INQUIRY_RESULT)
276        if not isinstance(event, (hci.ExtendedInquiryResult, hci.ExtendedInquiryResultRaw)):
277            return False
278        return bluetooth.Address(address) == event.address
279
280
281class L2capMatchers(object):
282
283    @staticmethod
284    def ConnectionRequest(psm):
285        return lambda packet: L2capMatchers._is_matching_connection_request(packet, psm)
286
287    @staticmethod
288    def ConnectionResponse(scid):
289        return lambda packet: L2capMatchers._is_matching_connection_response(packet, scid)
290
291    @staticmethod
292    def ConfigurationResponse(result=ConfigurationResponseResult.SUCCESS):
293        return lambda packet: L2capMatchers._is_matching_configuration_response(packet, result)
294
295    @staticmethod
296    def ConfigurationRequest(cid=None):
297        return lambda packet: L2capMatchers._is_matching_configuration_request_with_cid(packet, cid)
298
299    @staticmethod
300    def ConfigurationRequestWithErtm():
301        return lambda packet: L2capMatchers._is_matching_configuration_request_with_ertm(packet)
302
303    @staticmethod
304    def ConfigurationRequestView(dcid):
305        return lambda request_view: request_view.GetDestinationCid() == dcid
306
307    @staticmethod
308    def DisconnectionRequest(scid, dcid):
309        return lambda packet: L2capMatchers._is_matching_disconnection_request(packet, scid, dcid)
310
311    @staticmethod
312    def DisconnectionResponse(scid, dcid):
313        return lambda packet: L2capMatchers._is_matching_disconnection_response(packet, scid, dcid)
314
315    @staticmethod
316    def EchoResponse():
317        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.ECHO_RESPONSE)
318
319    @staticmethod
320    def CommandReject():
321        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT)
322
323    @staticmethod
324    def LeCommandReject():
325        return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT)
326
327    @staticmethod
328    def LeConnectionParameterUpdateRequest():
329        return lambda packet: L2capMatchers._is_le_control_frame_with_code(
330            packet, LeCommandCode.CONNECTION_PARAMETER_UPDATE_REQUEST)
331
332    @staticmethod
333    def LeConnectionParameterUpdateResponse(result=l2cap_packets.ConnectionParameterUpdateResponseResult.ACCEPTED):
334        return lambda packet: L2capMatchers._is_matching_connection_parameter_update_response(packet, result)
335
336    @staticmethod
337    def CreditBasedConnectionRequest(psm):
338        return lambda packet: L2capMatchers._is_matching_credit_based_connection_request(packet, psm)
339
340    @staticmethod
341    def CreditBasedConnectionResponse(result=LeCreditBasedConnectionResponseResult.SUCCESS):
342        return lambda packet: L2capMatchers._is_matching_credit_based_connection_response(packet, result)
343
344    @staticmethod
345    def CreditBasedConnectionResponseUsedCid():
346        return lambda packet: L2capMatchers._is_matching_credit_based_connection_response(
347            packet, LeCreditBasedConnectionResponseResult.SOURCE_CID_ALREADY_ALLOCATED
348        ) or L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT)
349
350    @staticmethod
351    def LeDisconnectionRequest(scid, dcid):
352        return lambda packet: L2capMatchers._is_matching_le_disconnection_request(packet, scid, dcid)
353
354    @staticmethod
355    def LeDisconnectionResponse(scid, dcid):
356        return lambda packet: L2capMatchers._is_matching_le_disconnection_response(packet, scid, dcid)
357
358    @staticmethod
359    def LeFlowControlCredit(cid):
360        return lambda packet: L2capMatchers._is_matching_le_flow_control_credit(packet, cid)
361
362    @staticmethod
363    def SFrame(req_seq=None, f=None, s=None, p=None):
364        return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, req_seq, f, s, p)
365
366    @staticmethod
367    def IFrame(tx_seq=None, payload=None, f=None):
368        return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=False)
369
370    @staticmethod
371    def IFrameWithFcs(tx_seq=None, payload=None, f=None):
372        return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=True)
373
374    @staticmethod
375    def IFrameStart(tx_seq=None, payload=None, f=None):
376        return lambda packet: L2capMatchers._is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False)
377
378    @staticmethod
379    def Data(payload):
380        return lambda packet: packet.GetPayload().GetBytes() == payload
381
382    @staticmethod
383    def FirstLeIFrame(payload, sdu_size):
384        return lambda packet: L2capMatchers._is_matching_first_le_i_frame(packet, payload, sdu_size)
385
386    # this is a hack - should be removed
387    @staticmethod
388    def PartialData(payload):
389        return lambda packet: payload in packet.GetPayload().GetBytes()
390
391    # this is a hack - should be removed
392    @staticmethod
393    def PacketPayloadRawData(payload):
394        return lambda packet: payload in packet.payload
395
396    # this is a hack - should be removed
397    @staticmethod
398    def PacketPayloadWithMatchingPsm(psm):
399        return lambda packet: None if psm != packet.psm else packet
400
401    # this is a hack - should be removed
402    @staticmethod
403    def PacketPayloadWithMatchingCid(cid):
404        return lambda packet: None if cid != packet.fixed_cid else packet
405
406    @staticmethod
407    def ExtractBasicFrame(scid):
408        return lambda packet: L2capMatchers._basic_frame_for(packet, scid)
409
410    @staticmethod
411    def ExtractBasicFrameWithFcs(scid):
412        return lambda packet: L2capMatchers._basic_frame_with_fcs_for(packet, scid)
413
414    @staticmethod
415    def InformationRequestWithType(info_type):
416        return lambda packet: L2capMatchers._information_request_with_type(packet, info_type)
417
418    @staticmethod
419    def InformationResponseExtendedFeatures(supports_ertm=None,
420                                            supports_streaming=None,
421                                            supports_fcs=None,
422                                            supports_fixed_channels=None):
423        return lambda packet: L2capMatchers._is_matching_information_response_extended_features(
424            packet, supports_ertm, supports_streaming, supports_fcs, supports_fixed_channels)
425
426    @staticmethod
427    def _basic_frame(packet):
428        if packet is None:
429            return None
430        return l2cap_packets.BasicFrameView(bt_packets.PacketViewLittleEndian(list(packet.payload)))
431
432    @staticmethod
433    def _basic_frame_with_fcs(packet):
434        if packet is None:
435            return None
436        return l2cap_packets.BasicFrameWithFcsView(bt_packets.PacketViewLittleEndian(list(packet.payload)))
437
438    @staticmethod
439    def _basic_frame_for(packet, scid):
440        frame = L2capMatchers._basic_frame(packet)
441        if frame.GetChannelId() != scid:
442            return None
443        return frame
444
445    @staticmethod
446    def _basic_frame_with_fcs_for(packet, scid):
447        frame = L2capMatchers._basic_frame(packet)
448        if frame.GetChannelId() != scid:
449            return None
450        frame = L2capMatchers._basic_frame_with_fcs(packet)
451        if frame is None:
452            return None
453        return frame
454
455    @staticmethod
456    def _information_frame(packet):
457        standard_frame = l2cap_packets.StandardFrameView(packet)
458        if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME:
459            return None
460        return l2cap_packets.EnhancedInformationFrameView(standard_frame)
461
462    @staticmethod
463    def _information_frame_with_fcs(packet):
464        standard_frame = l2cap_packets.StandardFrameWithFcsView(packet)
465        if standard_frame is None:
466            return None
467        if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME:
468            return None
469        return l2cap_packets.EnhancedInformationFrameWithFcsView(standard_frame)
470
471    @staticmethod
472    def _information_start_frame(packet):
473        start_frame = L2capMatchers._information_frame(packet)
474        if start_frame is None:
475            return None
476        return l2cap_packets.EnhancedInformationStartFrameView(start_frame)
477
478    @staticmethod
479    def _information_start_frame_with_fcs(packet):
480        start_frame = L2capMatchers._information_frame_with_fcs(packet)
481        if start_frame is None:
482            return None
483        return l2cap_packets.EnhancedInformationStartFrameWithFcsView(start_frame)
484
485    @staticmethod
486    def _supervisory_frame(packet):
487        standard_frame = l2cap_packets.StandardFrameView(packet)
488        if standard_frame.GetFrameType() != l2cap_packets.FrameType.S_FRAME:
489            return None
490        return l2cap_packets.EnhancedSupervisoryFrameView(standard_frame)
491
492    @staticmethod
493    def _is_matching_information_frame(packet, tx_seq, payload, f, fcs=False):
494        if fcs:
495            frame = L2capMatchers._information_frame_with_fcs(packet)
496        else:
497            frame = L2capMatchers._information_frame(packet)
498        if frame is None:
499            return False
500        if tx_seq is not None and frame.GetTxSeq() != tx_seq:
501            return False
502        if payload is not None and frame.GetPayload().GetBytes() != payload:
503            return False
504        if f is not None and frame.GetF() != f:
505            return False
506        return True
507
508    @staticmethod
509    def _is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False):
510        if fcs:
511            frame = L2capMatchers._information_start_frame_with_fcs(packet)
512        else:
513            frame = L2capMatchers._information_start_frame(packet)
514        if frame is None:
515            return False
516        if tx_seq is not None and frame.GetTxSeq() != tx_seq:
517            return False
518        if payload is not None and frame.GetPayload().GetBytes() != payload:
519            return False
520        if f is not None and frame.GetF() != f:
521            return False
522        return True
523
524    @staticmethod
525    def _is_matching_supervisory_frame(packet, req_seq, f, s, p):
526        frame = L2capMatchers._supervisory_frame(packet)
527        if frame is None:
528            return False
529        if req_seq is not None and frame.GetReqSeq() != req_seq:
530            return False
531        if f is not None and frame.GetF() != f:
532            return False
533        if s is not None and frame.GetS() != s:
534            return False
535        if p is not None and frame.GetP() != p:
536            return False
537        return True
538
539    @staticmethod
540    def _is_matching_first_le_i_frame(packet, payload, sdu_size):
541        first_le_i_frame = l2cap_packets.FirstLeInformationFrameView(packet)
542        return first_le_i_frame.GetPayload().GetBytes() == payload and first_le_i_frame.GetL2capSduLength() == sdu_size
543
544    @staticmethod
545    def _control_frame(packet):
546        if packet.GetChannelId() != 1:
547            return None
548        return l2cap_packets.ControlView(packet.GetPayload())
549
550    @staticmethod
551    def _le_control_frame(packet):
552        if packet.GetChannelId() != 5:
553            return None
554        return l2cap_packets.LeControlView(packet.GetPayload())
555
556    @staticmethod
557    def control_frame_with_code(packet, code):
558        frame = L2capMatchers._control_frame(packet)
559        if frame is None or frame.GetCode() != code:
560            return None
561        return frame
562
563    @staticmethod
564    def le_control_frame_with_code(packet, code):
565        frame = L2capMatchers._le_control_frame(packet)
566        if frame is None or frame.GetCode() != code:
567            return None
568        return frame
569
570    @staticmethod
571    def _is_control_frame_with_code(packet, code):
572        return L2capMatchers.control_frame_with_code(packet, code) is not None
573
574    @staticmethod
575    def _is_le_control_frame_with_code(packet, code):
576        return L2capMatchers.le_control_frame_with_code(packet, code) is not None
577
578    @staticmethod
579    def _is_matching_connection_request(packet, psm):
580        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_REQUEST)
581        if frame is None:
582            return False
583        request = l2cap_packets.ConnectionRequestView(frame)
584        return request.GetPsm() == psm
585
586    @staticmethod
587    def _is_matching_connection_response(packet, scid):
588        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_RESPONSE)
589        if frame is None:
590            return False
591        response = l2cap_packets.ConnectionResponseView(frame)
592        return response.GetSourceCid() == scid and response.GetResult(
593        ) == ConnectionResponseResult.SUCCESS and response.GetDestinationCid() != 0
594
595    @staticmethod
596    def _is_matching_configuration_request_with_cid(packet, cid=None):
597        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST)
598        if frame is None:
599            return False
600        request = l2cap_packets.ConfigurationRequestView(frame)
601        dcid = request.GetDestinationCid()
602        return cid is None or cid == dcid
603
604    @staticmethod
605    def _is_matching_configuration_request_with_ertm(packet):
606        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST)
607        if frame is None:
608            return False
609        request = l2cap_packets.ConfigurationRequestView(frame)
610        config_bytes = request.GetBytes()
611        # TODO(b/153189503): Use packet struct parser.
612        return b"\x04\x09\x03" in config_bytes
613
614    @staticmethod
615    def _is_matching_configuration_response(packet, result=ConfigurationResponseResult.SUCCESS):
616        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_RESPONSE)
617        if frame is None:
618            return False
619        response = l2cap_packets.ConfigurationResponseView(frame)
620        return response.GetResult() == result
621
622    @staticmethod
623    def _is_matching_disconnection_request(packet, scid, dcid):
624        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.DISCONNECTION_REQUEST)
625        if frame is None:
626            return False
627        request = l2cap_packets.DisconnectionRequestView(frame)
628        return request.GetSourceCid() == scid and request.GetDestinationCid() == dcid
629
630    @staticmethod
631    def _is_matching_disconnection_response(packet, scid, dcid):
632        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.DISCONNECTION_RESPONSE)
633        if frame is None:
634            return False
635        response = l2cap_packets.DisconnectionResponseView(frame)
636        return response.GetSourceCid() == scid and response.GetDestinationCid() == dcid
637
638    @staticmethod
639    def _is_matching_le_disconnection_response(packet, scid, dcid):
640        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.DISCONNECTION_RESPONSE)
641        if frame is None:
642            return False
643        response = l2cap_packets.LeDisconnectionResponseView(frame)
644        return response.GetSourceCid() == scid and response.GetDestinationCid() == dcid
645
646    @staticmethod
647    def _is_matching_le_disconnection_request(packet, scid, dcid):
648        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.DISCONNECTION_REQUEST)
649        if frame is None:
650            return False
651        request = l2cap_packets.LeDisconnectionRequestView(frame)
652        return request.GetSourceCid() == scid and request.GetDestinationCid() == dcid
653
654    @staticmethod
655    def _is_matching_le_flow_control_credit(packet, cid):
656        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_FLOW_CONTROL_CREDIT)
657        if frame is None:
658            return False
659        request = l2cap_packets.LeFlowControlCreditView(frame)
660        return request.GetCid() == cid
661
662    @staticmethod
663    def _information_request_with_type(packet, info_type):
664        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.INFORMATION_REQUEST)
665        if frame is None:
666            return None
667        request = l2cap_packets.InformationRequestView(frame)
668        if request.GetInfoType() != info_type:
669            return None
670        return request
671
672    @staticmethod
673    def _information_response_with_type(packet, info_type):
674        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.INFORMATION_RESPONSE)
675        if frame is None:
676            return None
677        response = l2cap_packets.InformationResponseView(frame)
678        if response.GetInfoType() != info_type:
679            return None
680        return response
681
682    @staticmethod
683    def _is_matching_information_response_extended_features(packet, supports_ertm, supports_streaming, supports_fcs,
684                                                            supports_fixed_channels):
685        frame = L2capMatchers._information_response_with_type(packet,
686                                                              InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED)
687        if frame is None:
688            return False
689        features = l2cap_packets.InformationResponseExtendedFeaturesView(frame)
690        if supports_ertm is not None and features.GetEnhancedRetransmissionMode() != supports_ertm:
691            return False
692        if supports_streaming is not None and features.GetStreamingMode != supports_streaming:
693            return False
694        if supports_fcs is not None and features.GetFcsOption() != supports_fcs:
695            return False
696        if supports_fixed_channels is not None and features.GetFixedChannels() != supports_fixed_channels:
697            return False
698        return True
699
700    @staticmethod
701    def _is_matching_connection_parameter_update_response(packet, result):
702        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.CONNECTION_PARAMETER_UPDATE_RESPONSE)
703        if frame is None:
704            return False
705        return l2cap_packets.ConnectionParameterUpdateResponseView(frame).GetResult() == result
706
707    @staticmethod
708    def _is_matching_credit_based_connection_request(packet, psm):
709        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST)
710        if frame is None:
711            return False
712        request = l2cap_packets.LeCreditBasedConnectionRequestView(frame)
713        return request.GetLePsm() == psm
714
715    @staticmethod
716    def _is_matching_credit_based_connection_response(packet, result):
717        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_RESPONSE)
718        if frame is None:
719            return False
720        response = l2cap_packets.LeCreditBasedConnectionResponseView(frame)
721        return response.GetResult() == result and (result != LeCreditBasedConnectionResponseResult.SUCCESS or
722                                                   response.GetDestinationCid() != 0)
723
724    @staticmethod
725    def LinkSecurityInterfaceCallbackEvent(type):
726        return lambda event: True if event.event_type == type else False
727
728
729class SecurityMatchers(object):
730
731    @staticmethod
732    def UiMsg(type, address=None):
733        return lambda event: True if event.message_type == type and (address == None or address == event.peer
734                                                                    ) else False
735
736    @staticmethod
737    def BondMsg(type, address=None, reason=None):
738        return lambda event: True if event.message_type == type and (address == None or address == event.peer) and (
739            reason == None or reason == event.reason) else False
740
741    @staticmethod
742    def HelperMsg(type, address=None):
743        return lambda event: True if event.message_type == type and (address == None or address == event.peer
744                                                                    ) else False
745
746
747class IsoMatchers(object):
748
749    @staticmethod
750    def Data(payload):
751        return lambda packet: packet.payload == payload
752
753    @staticmethod
754    def PacketPayloadWithMatchingCisHandle(cis_handle):
755        return lambda packet: None if cis_handle != packet.handle else packet
756