• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import bluetooth_packets_python3 as bt_packets
18import logging
19
20from bluetooth_packets_python3 import hci_packets
21from bluetooth_packets_python3.hci_packets import EventCode
22from bluetooth_packets_python3 import l2cap_packets
23from bluetooth_packets_python3.l2cap_packets import CommandCode, LeCommandCode
24from bluetooth_packets_python3.l2cap_packets import ConfigurationResponseResult
25from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult
26from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType
27from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult
28
29
30class HciMatchers(object):
31
32    @staticmethod
33    def CommandComplete(opcode):
34        return lambda msg: HciMatchers._is_matching_command_complete(msg.payload, opcode)
35
36    @staticmethod
37    def ExtractMatchingCommandComplete(packet_bytes, opcode=None):
38        return HciMatchers._extract_matching_command_complete(packet_bytes, opcode)
39
40    @staticmethod
41    def _is_matching_command_complete(packet_bytes, opcode=None):
42        return HciMatchers._extract_matching_command_complete(packet_bytes, opcode) is not None
43
44    @staticmethod
45    def _extract_matching_command_complete(packet_bytes, opcode=None):
46        event = HciMatchers._extract_matching_event(packet_bytes, EventCode.COMMAND_COMPLETE)
47        if event is None:
48            return None
49        complete = hci_packets.CommandCompleteView(event)
50        if opcode is None or complete is None:
51            return complete
52        else:
53            if complete.GetCommandOpCode() != opcode:
54                return None
55            else:
56                return complete
57
58    @staticmethod
59    def CommandStatus(opcode=None):
60        return lambda msg: HciMatchers._is_matching_command_status(msg.payload, opcode)
61
62    @staticmethod
63    def ExtractMatchingCommandStatus(packet_bytes, opcode=None):
64        return HciMatchers._extract_matching_command_complete(packet_bytes, opcode)
65
66    @staticmethod
67    def _is_matching_command_status(packet_bytes, opcode=None):
68        return HciMatchers._extract_matching_command_status(packet_bytes, opcode) is not None
69
70    @staticmethod
71    def _extract_matching_command_status(packet_bytes, opcode=None):
72        event = HciMatchers._extract_matching_event(packet_bytes, EventCode.COMMAND_STATUS)
73        if event is None:
74            return None
75        complete = hci_packets.CommandStatusView(event)
76        if opcode is None or complete is None:
77            return complete
78        else:
79            if complete.GetCommandOpCode() != opcode:
80                return None
81            else:
82                return complete
83
84    @staticmethod
85    def EventWithCode(event_code):
86        return lambda msg: HciMatchers._is_matching_event(msg.payload, event_code)
87
88    @staticmethod
89    def ExtractEventWithCode(packet_bytes, event_code):
90        return HciMatchers._extract_matching_event(packet_bytes, event_code)
91
92    @staticmethod
93    def _is_matching_event(packet_bytes, event_code):
94        return HciMatchers._extract_matching_event(packet_bytes, event_code) is not None
95
96    @staticmethod
97    def _extract_matching_event(packet_bytes, event_code):
98        event = hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))
99        if event is None:
100            return None
101        if event_code is not None and event.GetEventCode() != event_code:
102            return None
103        return event
104
105    @staticmethod
106    def LeEventWithCode(subevent_code):
107        return lambda msg: HciMatchers._extract_matching_le_event(msg.payload, subevent_code) is not None
108
109    @staticmethod
110    def ExtractLeEventWithCode(packet_bytes, subevent_code):
111        return HciMatchers._extract_matching_le_event(packet_bytes, subevent_code)
112
113    @staticmethod
114    def _extract_matching_le_event(packet_bytes, subevent_code):
115        inner_event = HciMatchers._extract_matching_event(packet_bytes, hci_packets.EventCode.LE_META_EVENT)
116        if inner_event is None:
117            return None
118        event = hci_packets.LeMetaEventView(inner_event)
119        if event.GetSubeventCode() != subevent_code:
120            return None
121        return event
122
123    @staticmethod
124    def LeConnectionComplete():
125        return lambda msg: HciMatchers._extract_le_connection_complete(msg.payload) is not None
126
127    @staticmethod
128    def ExtractLeConnectionComplete(packet_bytes):
129        return HciMatchers._extract_le_connection_complete(packet_bytes)
130
131    @staticmethod
132    def _extract_le_connection_complete(packet_bytes):
133        inner_event = HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.CONNECTION_COMPLETE)
134        if inner_event is not None:
135            return hci_packets.LeConnectionCompleteView(inner_event)
136
137        inner_event = HciMatchers._extract_matching_le_event(packet_bytes,
138                                                             hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
139        if inner_event is not None:
140            return hci_packets.LeEnhancedConnectionCompleteView(inner_event)
141
142        return None
143
144    @staticmethod
145    def LogEventCode():
146        return lambda event: logging.info("Received event: %x" % hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(event.payload))).GetEventCode())
147
148    @staticmethod
149    def LinkKeyRequest():
150        return lambda event: HciMatchers.EventWithCode(EventCode.LINK_KEY_REQUEST)
151
152    @staticmethod
153    def IoCapabilityRequest():
154        return lambda event: HciMatchers.EventWithCode(EventCode.IO_CAPABILITY_REQUEST)
155
156    @staticmethod
157    def IoCapabilityResponse():
158        return lambda event: HciMatchers.EventWithCode(EventCode.IO_CAPABILITY_RESPONSE)
159
160    @staticmethod
161    def UserPasskeyNotification():
162        return lambda event: HciMatchers.EventWithCode(EventCode.USER_PASSKEY_NOTIFICATION)
163
164    @staticmethod
165    def UserPasskeyRequest():
166        return lambda event: HciMatchers.EventWithCode(EventCode.USER_PASSKEY_REQUEST)
167
168    @staticmethod
169    def UserConfirmationRequest():
170        return lambda event: HciMatchers.EventWithCode(EventCode.USER_CONFIRMATION_REQUEST)
171
172    @staticmethod
173    def RemoteHostSupportedFeaturesNotification():
174        return lambda event: HciMatchers.EventWithCode(EventCode.REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION)
175
176    @staticmethod
177    def LinkKeyNotification():
178        return lambda event: HciMatchers.EventWithCode(EventCode.LINK_KEY_NOTIFICATION)
179
180    @staticmethod
181    def SimplePairingComplete():
182        return lambda event: HciMatchers.EventWithCode(EventCode.SIMPLE_PAIRING_COMPLETE)
183
184    @staticmethod
185    def Disconnect():
186        return lambda event: HciMatchers.EventWithCode(EventCode.DISCONNECT)
187
188    @staticmethod
189    def DisconnectionComplete():
190        return lambda event: HciMatchers.EventWithCode(EventCode.DISCONNECTION_COMPLETE)
191
192    @staticmethod
193    def RemoteOobDataRequest():
194        return lambda event: HciMatchers.EventWithCode(EventCode.REMOTE_OOB_DATA_REQUEST)
195
196    @staticmethod
197    def PinCodeRequest():
198        return lambda event: HciMatchers.EventWithCode(EventCode.PIN_CODE_REQUEST)
199
200    @staticmethod
201    def LoopbackOf(packet):
202        return HciMatchers.Exactly(hci_packets.LoopbackCommandBuilder(packet))
203
204    @staticmethod
205    def Exactly(packet):
206        data = bytes(packet.Serialize())
207        return lambda event: data == event.payload
208
209
210class AdvertisingMatchers(object):
211
212    @staticmethod
213    def AdvertisingCallbackMsg(type, advertiser_id=None, status=None, data=None):
214        return lambda event: True if event.message_type == type and (advertiser_id == None or advertiser_id == event.advertiser_id) \
215            and (status == None or status == event.status) and (data == None or data == event.data) else False
216
217    @staticmethod
218    def AddressMsg(type, advertiser_id=None, address=None):
219        return lambda event: True if event.message_type == type and (advertiser_id == None or advertiser_id == event.advertiser_id) \
220            and (address == None or address == event.address) else False
221
222
223class ScanningMatchers(object):
224
225    @staticmethod
226    def ScanningCallbackMsg(type, status=None, data=None):
227        return lambda event: True if event.message_type == type and (status == None or status == event.status) \
228            and (data == None or data == event.data) else False
229
230
231class NeighborMatchers(object):
232
233    @staticmethod
234    def InquiryResult(address):
235        return lambda msg: NeighborMatchers._is_matching_inquiry_result(msg.packet, address)
236
237    @staticmethod
238    def _is_matching_inquiry_result(packet, address):
239        hci_event = HciMatchers.ExtractEventWithCode(packet, EventCode.INQUIRY_RESULT)
240        if hci_event is None:
241            return False
242        inquiry_view = hci_packets.InquiryResultView(hci_event)
243        if inquiry_view is None:
244            return False
245        results = inquiry_view.GetResponses()
246        return any((address == result.bd_addr for result in results))
247
248    @staticmethod
249    def InquiryResultwithRssi(address):
250        return lambda msg: NeighborMatchers._is_matching_inquiry_result_with_rssi(msg.packet, address)
251
252    @staticmethod
253    def _is_matching_inquiry_result_with_rssi(packet, address):
254        hci_event = HciMatchers.ExtractEventWithCode(packet, EventCode.INQUIRY_RESULT_WITH_RSSI)
255        if hci_event is None:
256            return False
257        inquiry_view = hci_packets.InquiryResultWithRssiView(hci_event)
258        if inquiry_view is None:
259            return False
260        results = inquiry_view.GetResponses()
261        return any((address == result.address for result in results))
262
263    @staticmethod
264    def ExtendedInquiryResult(address):
265        return lambda msg: NeighborMatchers._is_matching_extended_inquiry_result(msg.packet, address)
266
267    @staticmethod
268    def _is_matching_extended_inquiry_result(packet, address):
269        hci_event = HciMatchers.ExtractEventWithCode(packet, EventCode.EXTENDED_INQUIRY_RESULT)
270        if hci_event is None:
271            return False
272        extended_view = hci_packets.ExtendedInquiryResultView(hci_event)
273        if extended_view is None:
274            return False
275        return address == extended_view.GetAddress()
276
277
278class L2capMatchers(object):
279
280    @staticmethod
281    def ConnectionRequest(psm):
282        return lambda packet: L2capMatchers._is_matching_connection_request(packet, psm)
283
284    @staticmethod
285    def ConnectionResponse(scid):
286        return lambda packet: L2capMatchers._is_matching_connection_response(packet, scid)
287
288    @staticmethod
289    def ConfigurationResponse(result=ConfigurationResponseResult.SUCCESS):
290        return lambda packet: L2capMatchers._is_matching_configuration_response(packet, result)
291
292    @staticmethod
293    def ConfigurationRequest(cid=None):
294        return lambda packet: L2capMatchers._is_matching_configuration_request_with_cid(packet, cid)
295
296    @staticmethod
297    def ConfigurationRequestWithErtm():
298        return lambda packet: L2capMatchers._is_matching_configuration_request_with_ertm(packet)
299
300    @staticmethod
301    def ConfigurationRequestView(dcid):
302        return lambda request_view: request_view.GetDestinationCid() == dcid
303
304    @staticmethod
305    def DisconnectionRequest(scid, dcid):
306        return lambda packet: L2capMatchers._is_matching_disconnection_request(packet, scid, dcid)
307
308    @staticmethod
309    def DisconnectionResponse(scid, dcid):
310        return lambda packet: L2capMatchers._is_matching_disconnection_response(packet, scid, dcid)
311
312    @staticmethod
313    def EchoResponse():
314        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.ECHO_RESPONSE)
315
316    @staticmethod
317    def CommandReject():
318        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT)
319
320    @staticmethod
321    def LeCommandReject():
322        return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT)
323
324    @staticmethod
325    def LeConnectionParameterUpdateRequest():
326        return lambda packet: L2capMatchers._is_le_control_frame_with_code(
327            packet, LeCommandCode.CONNECTION_PARAMETER_UPDATE_REQUEST)
328
329    @staticmethod
330    def LeConnectionParameterUpdateResponse(result=l2cap_packets.ConnectionParameterUpdateResponseResult.ACCEPTED):
331        return lambda packet: L2capMatchers._is_matching_connection_parameter_update_response(packet, result)
332
333    @staticmethod
334    def CreditBasedConnectionRequest(psm):
335        return lambda packet: L2capMatchers._is_matching_credit_based_connection_request(packet, psm)
336
337    @staticmethod
338    def CreditBasedConnectionResponse(result=LeCreditBasedConnectionResponseResult.SUCCESS):
339        return lambda packet: L2capMatchers._is_matching_credit_based_connection_response(packet, result)
340
341    @staticmethod
342    def CreditBasedConnectionResponseUsedCid():
343        return lambda packet: L2capMatchers._is_matching_credit_based_connection_response(
344            packet, LeCreditBasedConnectionResponseResult.SOURCE_CID_ALREADY_ALLOCATED
345        ) or L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT)
346
347    @staticmethod
348    def LeDisconnectionRequest(scid, dcid):
349        return lambda packet: L2capMatchers._is_matching_le_disconnection_request(packet, scid, dcid)
350
351    @staticmethod
352    def LeDisconnectionResponse(scid, dcid):
353        return lambda packet: L2capMatchers._is_matching_le_disconnection_response(packet, scid, dcid)
354
355    @staticmethod
356    def LeFlowControlCredit(cid):
357        return lambda packet: L2capMatchers._is_matching_le_flow_control_credit(packet, cid)
358
359    @staticmethod
360    def SFrame(req_seq=None, f=None, s=None, p=None):
361        return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, req_seq, f, s, p)
362
363    @staticmethod
364    def IFrame(tx_seq=None, payload=None, f=None):
365        return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=False)
366
367    @staticmethod
368    def IFrameWithFcs(tx_seq=None, payload=None, f=None):
369        return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=True)
370
371    @staticmethod
372    def IFrameStart(tx_seq=None, payload=None, f=None):
373        return lambda packet: L2capMatchers._is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False)
374
375    @staticmethod
376    def Data(payload):
377        return lambda packet: packet.GetPayload().GetBytes() == payload
378
379    @staticmethod
380    def FirstLeIFrame(payload, sdu_size):
381        return lambda packet: L2capMatchers._is_matching_first_le_i_frame(packet, payload, sdu_size)
382
383    # this is a hack - should be removed
384    @staticmethod
385    def PartialData(payload):
386        return lambda packet: payload in packet.GetPayload().GetBytes()
387
388    # this is a hack - should be removed
389    @staticmethod
390    def PacketPayloadRawData(payload):
391        return lambda packet: payload in packet.payload
392
393    # this is a hack - should be removed
394    @staticmethod
395    def PacketPayloadWithMatchingPsm(psm):
396        return lambda packet: None if psm != packet.psm else packet
397
398    # this is a hack - should be removed
399    @staticmethod
400    def PacketPayloadWithMatchingCid(cid):
401        return lambda packet: None if cid != packet.fixed_cid else packet
402
403    @staticmethod
404    def ExtractBasicFrame(scid):
405        return lambda packet: L2capMatchers._basic_frame_for(packet, scid)
406
407    @staticmethod
408    def ExtractBasicFrameWithFcs(scid):
409        return lambda packet: L2capMatchers._basic_frame_with_fcs_for(packet, scid)
410
411    @staticmethod
412    def InformationRequestWithType(info_type):
413        return lambda packet: L2capMatchers._information_request_with_type(packet, info_type)
414
415    @staticmethod
416    def InformationResponseExtendedFeatures(supports_ertm=None,
417                                            supports_streaming=None,
418                                            supports_fcs=None,
419                                            supports_fixed_channels=None):
420        return lambda packet: L2capMatchers._is_matching_information_response_extended_features(
421            packet, supports_ertm, supports_streaming, supports_fcs, supports_fixed_channels)
422
423    @staticmethod
424    def _basic_frame(packet):
425        if packet is None:
426            return None
427        return l2cap_packets.BasicFrameView(bt_packets.PacketViewLittleEndian(list(packet.payload)))
428
429    @staticmethod
430    def _basic_frame_with_fcs(packet):
431        if packet is None:
432            return None
433        return l2cap_packets.BasicFrameWithFcsView(bt_packets.PacketViewLittleEndian(list(packet.payload)))
434
435    @staticmethod
436    def _basic_frame_for(packet, scid):
437        frame = L2capMatchers._basic_frame(packet)
438        if frame.GetChannelId() != scid:
439            return None
440        return frame
441
442    @staticmethod
443    def _basic_frame_with_fcs_for(packet, scid):
444        frame = L2capMatchers._basic_frame(packet)
445        if frame.GetChannelId() != scid:
446            return None
447        frame = L2capMatchers._basic_frame_with_fcs(packet)
448        if frame is None:
449            return None
450        return frame
451
452    @staticmethod
453    def _information_frame(packet):
454        standard_frame = l2cap_packets.StandardFrameView(packet)
455        if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME:
456            return None
457        return l2cap_packets.EnhancedInformationFrameView(standard_frame)
458
459    @staticmethod
460    def _information_frame_with_fcs(packet):
461        standard_frame = l2cap_packets.StandardFrameWithFcsView(packet)
462        if standard_frame is None:
463            return None
464        if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME:
465            return None
466        return l2cap_packets.EnhancedInformationFrameWithFcsView(standard_frame)
467
468    @staticmethod
469    def _information_start_frame(packet):
470        start_frame = L2capMatchers._information_frame(packet)
471        if start_frame is None:
472            return None
473        return l2cap_packets.EnhancedInformationStartFrameView(start_frame)
474
475    @staticmethod
476    def _information_start_frame_with_fcs(packet):
477        start_frame = L2capMatchers._information_frame_with_fcs(packet)
478        if start_frame is None:
479            return None
480        return l2cap_packets.EnhancedInformationStartFrameWithFcsView(start_frame)
481
482    @staticmethod
483    def _supervisory_frame(packet):
484        standard_frame = l2cap_packets.StandardFrameView(packet)
485        if standard_frame.GetFrameType() != l2cap_packets.FrameType.S_FRAME:
486            return None
487        return l2cap_packets.EnhancedSupervisoryFrameView(standard_frame)
488
489    @staticmethod
490    def _is_matching_information_frame(packet, tx_seq, payload, f, fcs=False):
491        if fcs:
492            frame = L2capMatchers._information_frame_with_fcs(packet)
493        else:
494            frame = L2capMatchers._information_frame(packet)
495        if frame is None:
496            return False
497        if tx_seq is not None and frame.GetTxSeq() != tx_seq:
498            return False
499        if payload is not None and frame.GetPayload().GetBytes() != payload:
500            return False
501        if f is not None and frame.GetF() != f:
502            return False
503        return True
504
505    @staticmethod
506    def _is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False):
507        if fcs:
508            frame = L2capMatchers._information_start_frame_with_fcs(packet)
509        else:
510            frame = L2capMatchers._information_start_frame(packet)
511        if frame is None:
512            return False
513        if tx_seq is not None and frame.GetTxSeq() != tx_seq:
514            return False
515        if payload is not None and frame.GetPayload().GetBytes() != payload:
516            return False
517        if f is not None and frame.GetF() != f:
518            return False
519        return True
520
521    @staticmethod
522    def _is_matching_supervisory_frame(packet, req_seq, f, s, p):
523        frame = L2capMatchers._supervisory_frame(packet)
524        if frame is None:
525            return False
526        if req_seq is not None and frame.GetReqSeq() != req_seq:
527            return False
528        if f is not None and frame.GetF() != f:
529            return False
530        if s is not None and frame.GetS() != s:
531            return False
532        if p is not None and frame.GetP() != p:
533            return False
534        return True
535
536    @staticmethod
537    def _is_matching_first_le_i_frame(packet, payload, sdu_size):
538        first_le_i_frame = l2cap_packets.FirstLeInformationFrameView(packet)
539        return first_le_i_frame.GetPayload().GetBytes() == payload and first_le_i_frame.GetL2capSduLength() == sdu_size
540
541    @staticmethod
542    def _control_frame(packet):
543        if packet.GetChannelId() != 1:
544            return None
545        return l2cap_packets.ControlView(packet.GetPayload())
546
547    @staticmethod
548    def _le_control_frame(packet):
549        if packet.GetChannelId() != 5:
550            return None
551        return l2cap_packets.LeControlView(packet.GetPayload())
552
553    @staticmethod
554    def control_frame_with_code(packet, code):
555        frame = L2capMatchers._control_frame(packet)
556        if frame is None or frame.GetCode() != code:
557            return None
558        return frame
559
560    @staticmethod
561    def le_control_frame_with_code(packet, code):
562        frame = L2capMatchers._le_control_frame(packet)
563        if frame is None or frame.GetCode() != code:
564            return None
565        return frame
566
567    @staticmethod
568    def _is_control_frame_with_code(packet, code):
569        return L2capMatchers.control_frame_with_code(packet, code) is not None
570
571    @staticmethod
572    def _is_le_control_frame_with_code(packet, code):
573        return L2capMatchers.le_control_frame_with_code(packet, code) is not None
574
575    @staticmethod
576    def _is_matching_connection_request(packet, psm):
577        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_REQUEST)
578        if frame is None:
579            return False
580        request = l2cap_packets.ConnectionRequestView(frame)
581        return request.GetPsm() == psm
582
583    @staticmethod
584    def _is_matching_connection_response(packet, scid):
585        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_RESPONSE)
586        if frame is None:
587            return False
588        response = l2cap_packets.ConnectionResponseView(frame)
589        return response.GetSourceCid() == scid and response.GetResult(
590        ) == ConnectionResponseResult.SUCCESS and response.GetDestinationCid() != 0
591
592    @staticmethod
593    def _is_matching_configuration_request_with_cid(packet, cid=None):
594        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST)
595        if frame is None:
596            return False
597        request = l2cap_packets.ConfigurationRequestView(frame)
598        dcid = request.GetDestinationCid()
599        return cid is None or cid == dcid
600
601    @staticmethod
602    def _is_matching_configuration_request_with_ertm(packet):
603        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST)
604        if frame is None:
605            return False
606        request = l2cap_packets.ConfigurationRequestView(frame)
607        config_bytes = request.GetBytes()
608        # TODO(b/153189503): Use packet struct parser.
609        return b"\x04\x09\x03" in config_bytes
610
611    @staticmethod
612    def _is_matching_configuration_response(packet, result=ConfigurationResponseResult.SUCCESS):
613        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_RESPONSE)
614        if frame is None:
615            return False
616        response = l2cap_packets.ConfigurationResponseView(frame)
617        return response.GetResult() == result
618
619    @staticmethod
620    def _is_matching_disconnection_request(packet, scid, dcid):
621        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.DISCONNECTION_REQUEST)
622        if frame is None:
623            return False
624        request = l2cap_packets.DisconnectionRequestView(frame)
625        return request.GetSourceCid() == scid and request.GetDestinationCid() == dcid
626
627    @staticmethod
628    def _is_matching_disconnection_response(packet, scid, dcid):
629        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.DISCONNECTION_RESPONSE)
630        if frame is None:
631            return False
632        response = l2cap_packets.DisconnectionResponseView(frame)
633        return response.GetSourceCid() == scid and response.GetDestinationCid() == dcid
634
635    @staticmethod
636    def _is_matching_le_disconnection_response(packet, scid, dcid):
637        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.DISCONNECTION_RESPONSE)
638        if frame is None:
639            return False
640        response = l2cap_packets.LeDisconnectionResponseView(frame)
641        return response.GetSourceCid() == scid and response.GetDestinationCid() == dcid
642
643    @staticmethod
644    def _is_matching_le_disconnection_request(packet, scid, dcid):
645        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.DISCONNECTION_REQUEST)
646        if frame is None:
647            return False
648        request = l2cap_packets.LeDisconnectionRequestView(frame)
649        return request.GetSourceCid() == scid and request.GetDestinationCid() == dcid
650
651    @staticmethod
652    def _is_matching_le_flow_control_credit(packet, cid):
653        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_FLOW_CONTROL_CREDIT)
654        if frame is None:
655            return False
656        request = l2cap_packets.LeFlowControlCreditView(frame)
657        return request.GetCid() == cid
658
659    @staticmethod
660    def _information_request_with_type(packet, info_type):
661        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.INFORMATION_REQUEST)
662        if frame is None:
663            return None
664        request = l2cap_packets.InformationRequestView(frame)
665        if request.GetInfoType() != info_type:
666            return None
667        return request
668
669    @staticmethod
670    def _information_response_with_type(packet, info_type):
671        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.INFORMATION_RESPONSE)
672        if frame is None:
673            return None
674        response = l2cap_packets.InformationResponseView(frame)
675        if response.GetInfoType() != info_type:
676            return None
677        return response
678
679    @staticmethod
680    def _is_matching_information_response_extended_features(packet, supports_ertm, supports_streaming, supports_fcs,
681                                                            supports_fixed_channels):
682        frame = L2capMatchers._information_response_with_type(packet,
683                                                              InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED)
684        if frame is None:
685            return False
686        features = l2cap_packets.InformationResponseExtendedFeaturesView(frame)
687        if supports_ertm is not None and features.GetEnhancedRetransmissionMode() != supports_ertm:
688            return False
689        if supports_streaming is not None and features.GetStreamingMode != supports_streaming:
690            return False
691        if supports_fcs is not None and features.GetFcsOption() != supports_fcs:
692            return False
693        if supports_fixed_channels is not None and features.GetFixedChannels() != supports_fixed_channels:
694            return False
695        return True
696
697    @staticmethod
698    def _is_matching_connection_parameter_update_response(packet, result):
699        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.CONNECTION_PARAMETER_UPDATE_RESPONSE)
700        if frame is None:
701            return False
702        return l2cap_packets.ConnectionParameterUpdateResponseView(frame).GetResult() == result
703
704    @staticmethod
705    def _is_matching_credit_based_connection_request(packet, psm):
706        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST)
707        if frame is None:
708            return False
709        request = l2cap_packets.LeCreditBasedConnectionRequestView(frame)
710        return request.GetLePsm() == psm
711
712    @staticmethod
713    def _is_matching_credit_based_connection_response(packet, result):
714        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_RESPONSE)
715        if frame is None:
716            return False
717        response = l2cap_packets.LeCreditBasedConnectionResponseView(frame)
718        return response.GetResult() == result and (result != LeCreditBasedConnectionResponseResult.SUCCESS or
719                                                   response.GetDestinationCid() != 0)
720
721    @staticmethod
722    def LinkSecurityInterfaceCallbackEvent(type):
723        return lambda event: True if event.event_type == type else False
724
725
726class SecurityMatchers(object):
727
728    @staticmethod
729    def UiMsg(type, address=None):
730        return lambda event: True if event.message_type == type and (address == None or address == event.peer) else False
731
732    @staticmethod
733    def BondMsg(type, address=None, reason=None):
734        return lambda event: True if event.message_type == type and (address == None or address == event.peer) and (reason == None or reason == event.reason) else False
735
736    @staticmethod
737    def HelperMsg(type, address=None):
738        return lambda event: True if event.message_type == type and (address == None or address == event.peer) else False
739
740
741class IsoMatchers(object):
742
743    @staticmethod
744    def Data(payload):
745        return lambda packet: packet.payload == payload
746
747    @staticmethod
748    def PacketPayloadWithMatchingCisHandle(cis_handle):
749        return lambda packet: None if cis_handle != packet.handle else packet
750