• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2#   Copyright 2019 - The Android Open Source Project
3#
4#   Licensed under the Apache License, Version 2.0 (the "License");
5#   you may not use this file except in compliance with the License.
6#   You may obtain a copy of the License at
7#
8#       http://www.apache.org/licenses/LICENSE-2.0
9#
10#   Unless required by applicable law or agreed to in writing, software
11#   distributed under the License is distributed on an "AS IS" BASIS,
12#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13#   See the License for the specific language governing permissions and
14#   limitations under the License.
15
16import time
17from datetime import timedelta
18from mobly import asserts
19
20from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass
21from cert.event_asserts import EventAsserts
22from cert.event_callback_stream import EventCallbackStream
23from facade import common_pb2
24from facade import rootservice_pb2 as facade_rootservice
25from google.protobuf import empty_pb2 as empty_proto
26from l2cap.classic import facade_pb2 as l2cap_facade_pb2
27from neighbor.facade import facade_pb2 as neighbor_facade
28from hci.facade import acl_manager_facade_pb2 as acl_manager_facade
29import bluetooth_packets_python3 as bt_packets
30from bluetooth_packets_python3 import hci_packets, l2cap_packets
31
32# Assemble a sample packet. TODO: Use RawBuilder
33SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1)
34
35
36class L2capTest(GdFacadeOnlyBaseTestClass):
37
38    def setup_test(self):
39        self.device_under_test.rootservice.StartStack(
40            facade_rootservice.StartStackRequest(
41                module_under_test=facade_rootservice.BluetoothModule.Value(
42                    'L2CAP'),))
43        self.cert_device.rootservice.StartStack(
44            facade_rootservice.StartStackRequest(
45                module_under_test=facade_rootservice.BluetoothModule.Value(
46                    'HCI_INTERFACES'),))
47
48        self.device_under_test.wait_channel_ready()
49        self.cert_device.wait_channel_ready()
50
51        self.device_under_test.address = self.device_under_test.hci_controller.GetMacAddress(
52            empty_proto.Empty()).address
53        cert_address = self.cert_device.controller_read_only_property.ReadLocalAddress(
54            empty_proto.Empty()).address
55        self.cert_device.address = cert_address
56        self.dut_address = common_pb2.BluetoothAddress(
57            address=self.device_under_test.address)
58        self.cert_address = common_pb2.BluetoothAddress(
59            address=self.cert_device.address)
60
61        self.device_under_test.neighbor.EnablePageScan(
62            neighbor_facade.EnableMsg(enabled=True))
63
64        self.cert_acl_handle = 0
65
66        self.on_connection_request = None
67        self.on_connection_response = None
68        self.on_configuration_request = None
69        self.on_configuration_response = None
70        self.on_disconnection_request = None
71        self.on_disconnection_response = None
72        self.on_information_request = None
73        self.on_information_response = None
74
75        self.scid_to_dcid = {}
76        self.ertm_tx_window_size = 10
77        self.ertm_max_transmit = 20
78
79    def _on_connection_request_default(self, l2cap_control_view):
80        connection_request_view = l2cap_packets.ConnectionRequestView(
81            l2cap_control_view)
82        sid = connection_request_view.GetIdentifier()
83        cid = connection_request_view.GetSourceCid()
84
85        self.scid_to_dcid[cid] = cid
86
87        connection_response = l2cap_packets.ConnectionResponseBuilder(
88            sid, cid, cid, l2cap_packets.ConnectionResponseResult.SUCCESS,
89            l2cap_packets.ConnectionResponseStatus.
90            NO_FURTHER_INFORMATION_AVAILABLE)
91        connection_response_l2cap = l2cap_packets.BasicFrameBuilder(
92            1, connection_response)
93        self.cert_device.hci_acl_manager.SendAclData(
94            acl_manager_facade.AclData(
95                handle=self.cert_acl_handle,
96                payload=bytes(connection_response_l2cap.Serialize())))
97        return True
98
99    def _on_connection_response_default(self, l2cap_control_view):
100        connection_response_view = l2cap_packets.ConnectionResponseView(
101            l2cap_control_view)
102        sid = connection_response_view.GetIdentifier()
103        scid = connection_response_view.GetSourceCid()
104        dcid = connection_response_view.GetDestinationCid()
105        self.scid_to_dcid[scid] = dcid
106
107        config_request = l2cap_packets.ConfigurationRequestBuilder(
108            sid + 1, dcid, l2cap_packets.Continuation.END, [])
109        config_request_l2cap = l2cap_packets.BasicFrameBuilder(
110            1, config_request)
111        self.cert_device.hci_acl_manager.SendAclData(
112            acl_manager_facade.AclData(
113                handle=self.cert_acl_handle,
114                payload=bytes(config_request_l2cap.Serialize())))
115        return True
116
117    def _on_connection_response_use_ertm(self, l2cap_control_view):
118        connection_response_view = l2cap_packets.ConnectionResponseView(
119            l2cap_control_view)
120        sid = connection_response_view.GetIdentifier()
121        scid = connection_response_view.GetSourceCid()
122        dcid = connection_response_view.GetDestinationCid()
123        self.scid_to_dcid[scid] = dcid
124
125        # FIXME: This doesn't work!
126        ertm_option = l2cap_packets.RetransmissionAndFlowControlConfigurationOption(
127        )
128        ertm_option.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC
129        ertm_option.tx_window_size = self.ertm_tx_window_size
130        ertm_option.max_transmit = self.ertm_max_transmit
131        ertm_option.retransmission_time_out = 2000
132        ertm_option.monitor_time_out = 12000
133        ertm_option.maximum_pdu_size = 1010
134
135        options = [ertm_option]
136
137        config_request = l2cap_packets.ConfigurationRequestBuilder(
138            sid + 1, dcid, l2cap_packets.Continuation.END, options)
139
140        config_request_l2cap = l2cap_packets.BasicFrameBuilder(
141            1, config_request)
142
143        config_packet = bytearray([
144            0x1a,
145            0x00,
146            0x01,
147            0x00,
148            0x04,
149            sid + 1,
150            0x16,
151            0x00,
152            dcid & 0xff,
153            dcid >> 8,
154            0x00,
155            0x00,
156            0x01,
157            0x02,
158            0xa0,
159            0x02,  # MTU
160            0x04,
161            0x09,
162            0x03,
163            self.ertm_tx_window_size,
164            self.ertm_max_transmit,
165            0xd0,
166            0x07,
167            0xe0,
168            0x2e,
169            0xf2,
170            0x03,  # ERTM
171            0x05,
172            0x01,
173            0x00  # FCS
174        ])
175
176        self.cert_device.hci_acl_manager.SendAclData(
177            acl_manager_facade.AclData(
178                handle=self.cert_acl_handle, payload=bytes(config_packet)))
179        return True
180
181    def _on_connection_response_use_ertm_and_fcs(self, l2cap_control_view):
182        connection_response_view = l2cap_packets.ConnectionResponseView(
183            l2cap_control_view)
184        sid = connection_response_view.GetIdentifier()
185        scid = connection_response_view.GetSourceCid()
186        dcid = connection_response_view.GetDestinationCid()
187        self.scid_to_dcid[scid] = dcid
188
189        # FIXME: This doesn't work!
190        ertm_option = l2cap_packets.RetransmissionAndFlowControlConfigurationOption(
191        )
192        ertm_option.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC
193        ertm_option.tx_window_size = self.ertm_tx_window_size
194        ertm_option.max_transmit = self.ertm_max_transmit
195        ertm_option.retransmission_time_out = 2000
196        ertm_option.monitor_time_out = 12000
197        ertm_option.maximum_pdu_size = 1010
198
199        options = [ertm_option]
200
201        config_request = l2cap_packets.ConfigurationRequestBuilder(
202            sid + 1, dcid, l2cap_packets.Continuation.END, options)
203
204        config_request_l2cap = l2cap_packets.BasicFrameBuilder(
205            1, config_request)
206
207        config_packet = bytearray([
208            0x1a,
209            0x00,
210            0x01,
211            0x00,
212            0x04,
213            sid + 1,
214            0x16,
215            0x00,
216            dcid & 0xff,
217            dcid >> 8,
218            0x00,
219            0x00,
220            0x01,
221            0x02,
222            0xa0,
223            0x02,  # MTU
224            0x04,
225            0x09,
226            0x03,
227            self.ertm_tx_window_size,
228            self.ertm_max_transmit,
229            0xd0,
230            0x07,
231            0xe0,
232            0x2e,
233            0xf2,
234            0x03,  # ERTM
235            0x05,
236            0x01,
237            0x01  # FCS
238        ])
239
240        self.cert_device.hci_acl_manager.SendAclData(
241            acl_manager_facade.AclData(
242                handle=self.cert_acl_handle, payload=bytes(config_packet)))
243        return True
244
245    def _on_configuration_request_default(self, l2cap_control_view):
246        configuration_request = l2cap_packets.ConfigurationRequestView(
247            l2cap_control_view)
248        sid = configuration_request.GetIdentifier()
249        dcid = configuration_request.GetDestinationCid()
250        config_response = l2cap_packets.ConfigurationResponseBuilder(
251            sid, self.scid_to_dcid.get(dcid, 0), l2cap_packets.Continuation.END,
252            l2cap_packets.ConfigurationResponseResult.SUCCESS, [])
253        config_response_l2cap = l2cap_packets.BasicFrameBuilder(
254            1, config_response)
255        self.cert_send_b_frame(config_response_l2cap)
256
257    def _on_configuration_response_default(self, l2cap_control_view):
258        configuration_response = l2cap_packets.ConfigurationResponseView(
259            l2cap_control_view)
260        sid = configuration_response.GetIdentifier()
261
262    def _on_disconnection_request_default(self, l2cap_control_view):
263        disconnection_request = l2cap_packets.DisconnectionRequestView(
264            l2cap_control_view)
265        sid = disconnection_request.GetIdentifier()
266        scid = disconnection_request.GetSourceCid()
267        dcid = disconnection_request.GetDestinationCid()
268        disconnection_response = l2cap_packets.DisconnectionResponseBuilder(
269            sid, dcid, scid)
270        disconnection_response_l2cap = l2cap_packets.BasicFrameBuilder(
271            1, disconnection_response)
272        self.cert_device.hci_acl_manager.SendAclData(
273            acl_manager_facade.AclData(
274                handle=self.cert_acl_handle,
275                payload=bytes(disconnection_response_l2cap.Serialize())))
276
277    def _on_disconnection_response_default(self, l2cap_control_view):
278        disconnection_response = l2cap_packets.DisconnectionResponseView(
279            l2cap_control_view)
280
281    def _on_information_request_default(self, l2cap_control_view):
282        information_request = l2cap_packets.InformationRequestView(
283            l2cap_control_view)
284        sid = information_request.GetIdentifier()
285        information_type = information_request.GetInfoType()
286        if information_type == l2cap_packets.InformationRequestInfoType.CONNECTIONLESS_MTU:
287            response = l2cap_packets.InformationResponseConnectionlessMtuBuilder(
288                sid, l2cap_packets.InformationRequestResult.SUCCESS, 100)
289            response_l2cap = l2cap_packets.BasicFrameBuilder(1, response)
290            self.cert_device.hci_acl_manager.SendAclData(
291                acl_manager_facade.AclData(
292                    handle=self.cert_acl_handle,
293                    payload=bytes(response_l2cap.Serialize())))
294            return
295        if information_type == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
296            response = l2cap_packets.InformationResponseExtendedFeaturesBuilder(
297                sid, l2cap_packets.InformationRequestResult.SUCCESS, 0, 0, 0, 1,
298                0, 1, 0, 0, 0, 0)
299            response_l2cap = l2cap_packets.BasicFrameBuilder(1, response)
300            self.cert_device.hci_acl_manager.SendAclData(
301                acl_manager_facade.AclData(
302                    handle=self.cert_acl_handle,
303                    payload=bytes(response_l2cap.Serialize())))
304            return
305        if information_type == l2cap_packets.InformationRequestInfoType.FIXED_CHANNELS_SUPPORTED:
306            response = l2cap_packets.InformationResponseFixedChannelsBuilder(
307                sid, l2cap_packets.InformationRequestResult.SUCCESS, 2)
308            response_l2cap = l2cap_packets.BasicFrameBuilder(1, response)
309            self.cert_device.hci_acl_manager.SendAclData(
310                acl_manager_facade.AclData(
311                    handle=self.cert_acl_handle,
312                    payload=bytes(response_l2cap.Serialize())))
313            return
314
315    def _on_information_response_default(self, l2cap_control_view):
316        information_response = l2cap_packets.InformationResponseView(
317            l2cap_control_view)
318
319    def teardown_test(self):
320        self.device_under_test.rootservice.StopStack(
321            facade_rootservice.StopStackRequest())
322        self.cert_device.rootservice.StopStack(
323            facade_rootservice.StopStackRequest())
324
325    def _handle_control_packet(self, l2cap_packet):
326        packet_bytes = l2cap_packet.payload
327        l2cap_view = l2cap_packets.BasicFrameView(
328            bt_packets.PacketViewLittleEndian(list(packet_bytes)))
329        if l2cap_view.GetChannelId() != 1:
330            return
331        l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload())
332        if l2cap_control_view.GetCode(
333        ) == l2cap_packets.CommandCode.CONNECTION_REQUEST:
334            return self.on_connection_request(
335                l2cap_control_view
336            ) if self.on_connection_request else self._on_connection_request_default(
337                l2cap_control_view)
338        if l2cap_control_view.GetCode(
339        ) == l2cap_packets.CommandCode.CONNECTION_RESPONSE:
340            return self.on_connection_response(
341                l2cap_control_view
342            ) if self.on_connection_response else self._on_connection_response_default(
343                l2cap_control_view)
344        if l2cap_control_view.GetCode(
345        ) == l2cap_packets.CommandCode.CONFIGURATION_REQUEST:
346            return self.on_configuration_request(
347                l2cap_control_view
348            ) if self.on_configuration_request else self._on_configuration_request_default(
349                l2cap_control_view)
350        if l2cap_control_view.GetCode(
351        ) == l2cap_packets.CommandCode.CONFIGURATION_RESPONSE:
352            return self.on_configuration_response(
353                l2cap_control_view
354            ) if self.on_configuration_response else self._on_configuration_response_default(
355                l2cap_control_view)
356        if l2cap_control_view.GetCode(
357        ) == l2cap_packets.CommandCode.DISCONNECTION_REQUEST:
358            return self.on_disconnection_request(
359                l2cap_control_view
360            ) if self.on_disconnection_request else self._on_disconnection_request_default(
361                l2cap_control_view)
362        if l2cap_control_view.GetCode(
363        ) == l2cap_packets.CommandCode.DISCONNECTION_RESPONSE:
364            return self.on_disconnection_response(
365                l2cap_control_view
366            ) if self.on_disconnection_response else self._on_disconnection_response_default(
367                l2cap_control_view)
368        if l2cap_control_view.GetCode(
369        ) == l2cap_packets.CommandCode.INFORMATION_REQUEST:
370            return self.on_information_request(
371                l2cap_control_view
372            ) if self.on_information_request else self._on_information_request_default(
373                l2cap_control_view)
374        if l2cap_control_view.GetCode(
375        ) == l2cap_packets.CommandCode.INFORMATION_RESPONSE:
376            return self.on_information_response(
377                l2cap_control_view
378            ) if self.on_information_response else self._on_information_response_default(
379                l2cap_control_view)
380        return
381
382    def is_correct_connection_request(self, l2cap_packet):
383        packet_bytes = l2cap_packet.payload
384        l2cap_view = l2cap_packets.BasicFrameView(
385            bt_packets.PacketViewLittleEndian(list(packet_bytes)))
386        if l2cap_view.GetChannelId() != 1:
387            return False
388        l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload())
389        return l2cap_control_view.GetCode(
390        ) == l2cap_packets.CommandCode.CONNECTION_REQUEST
391
392    def is_correct_configuration_response(self, l2cap_packet):
393        packet_bytes = l2cap_packet.payload
394        l2cap_view = l2cap_packets.BasicFrameView(
395            bt_packets.PacketViewLittleEndian(list(packet_bytes)))
396        if l2cap_view.GetChannelId() != 1:
397            return False
398        l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload())
399        if l2cap_control_view.GetCode(
400        ) != l2cap_packets.CommandCode.CONFIGURATION_RESPONSE:
401            return False
402        configuration_response_view = l2cap_packets.ConfigurationResponseView(
403            l2cap_control_view)
404        return configuration_response_view.GetResult(
405        ) == l2cap_packets.ConfigurationResponseResult.SUCCESS
406
407    def is_correct_configuration_request(self, l2cap_packet):
408        packet_bytes = l2cap_packet.payload
409        l2cap_view = l2cap_packets.BasicFrameView(
410            bt_packets.PacketViewLittleEndian(list(packet_bytes)))
411        if l2cap_view.GetChannelId() != 1:
412            return False
413        l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload())
414        return l2cap_control_view.GetCode(
415        ) == l2cap_packets.CommandCode.CONFIGURATION_REQUEST
416
417    def is_correct_disconnection_request(self, l2cap_packet):
418        packet_bytes = l2cap_packet.payload
419        l2cap_view = l2cap_packets.BasicFrameView(
420            bt_packets.PacketViewLittleEndian(list(packet_bytes)))
421        if l2cap_view.GetChannelId() != 1:
422            return False
423        l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload())
424        return l2cap_control_view.GetCode(
425        ) == l2cap_packets.CommandCode.DISCONNECTION_REQUEST
426
427    def cert_send_b_frame(self, b_frame):
428        self.cert_device.hci_acl_manager.SendAclData(
429            acl_manager_facade.AclData(
430                handle=self.cert_acl_handle,
431                payload=bytes(b_frame.Serialize())))
432
433    def get_req_seq_from_ertm_s_frame(self, scid, packet):
434        packet_bytes = packet.payload
435        l2cap_view = l2cap_packets.BasicFrameView(
436            bt_packets.PacketViewLittleEndian(list(packet_bytes)))
437        if l2cap_view.GetChannelId() != scid:
438            return False
439        standard_view = l2cap_packets.StandardFrameView(l2cap_view)
440        if standard_view.GetFrameType() == l2cap_packets.FrameType.I_FRAME:
441            return False
442        s_frame = l2cap_packets.EnhancedSupervisoryFrameView(standard_view)
443        return s_frame.GetReqSeq()
444
445    def get_s_from_ertm_s_frame(self, scid, packet):
446        packet_bytes = packet.payload
447        l2cap_view = l2cap_packets.BasicFrameView(
448            bt_packets.PacketViewLittleEndian(list(packet_bytes)))
449        if l2cap_view.GetChannelId() != scid:
450            return False
451        standard_view = l2cap_packets.StandardFrameView(l2cap_view)
452        if standard_view.GetFrameType() == l2cap_packets.FrameType.I_FRAME:
453            return False
454        s_frame = l2cap_packets.EnhancedSupervisoryFrameView(standard_view)
455        return s_frame.GetS()
456
457    def get_p_from_ertm_s_frame(self, scid, packet):
458        packet_bytes = packet.payload
459        l2cap_view = l2cap_packets.BasicFrameView(
460            bt_packets.PacketViewLittleEndian(list(packet_bytes)))
461        if l2cap_view.GetChannelId() != scid:
462            return False
463        standard_view = l2cap_packets.StandardFrameView(l2cap_view)
464        if standard_view.GetFrameType() == l2cap_packets.FrameType.I_FRAME:
465            return False
466        s_frame = l2cap_packets.EnhancedSupervisoryFrameView(standard_view)
467        return s_frame.GetP()
468
469    def get_f_from_ertm_s_frame(self, scid, packet):
470        packet_bytes = packet.payload
471        l2cap_view = l2cap_packets.BasicFrameView(
472            bt_packets.PacketViewLittleEndian(list(packet_bytes)))
473        if l2cap_view.GetChannelId() != scid:
474            return False
475        standard_view = l2cap_packets.StandardFrameView(l2cap_view)
476        if standard_view.GetFrameType() == l2cap_packets.FrameType.I_FRAME:
477            return False
478        s_frame = l2cap_packets.EnhancedSupervisoryFrameView(standard_view)
479        return s_frame.GetF()
480
481    def get_tx_seq_from_ertm_i_frame(self, scid, packet):
482        packet_bytes = packet.payload
483        l2cap_view = l2cap_packets.BasicFrameView(
484            bt_packets.PacketViewLittleEndian(list(packet_bytes)))
485        if l2cap_view.GetChannelId() != scid:
486            return False
487        standard_view = l2cap_packets.StandardFrameView(l2cap_view)
488        if standard_view.GetFrameType() == l2cap_packets.FrameType.S_FRAME:
489            return False
490        i_frame = l2cap_packets.EnhancedInformationFrameView(standard_view)
491        return i_frame.GetTxSeq()
492
493    def get_payload_from_ertm_i_frame(self, scid, packet):
494        packet_bytes = packet.payload
495        l2cap_view = l2cap_packets.BasicFrameView(
496            bt_packets.PacketViewLittleEndian(list(packet_bytes)))
497        if l2cap_view.GetChannelId() != scid:
498            return False
499        standard_view = l2cap_packets.StandardFrameView(l2cap_view)
500        if standard_view.GetFrameType() == l2cap_packets.FrameType.S_FRAME:
501            return False
502        i_frame = l2cap_packets.EnhancedInformationFrameView(standard_view)
503        return i_frame.GetPayload()  # TODO(mylesgw): This doesn't work!
504
505    def _setup_link_from_cert(self):
506
507        self.device_under_test.neighbor.EnablePageScan(
508            neighbor_facade.EnableMsg(enabled=True))
509
510        with EventCallbackStream(
511                self.cert_device.hci_acl_manager.CreateConnection(
512                    acl_manager_facade.ConnectionMsg(
513                        address_type=int(
514                            hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS),
515                        address=bytes(self.dut_address.address)))
516        ) as connection_event_stream:
517
518            connection_event_asserts = EventAsserts(connection_event_stream)
519
520            # Cert gets ConnectionComplete with a handle and sends ACL data
521            handle = 0xfff
522
523            def get_handle(packet):
524                packet_bytes = packet.event
525                if b'\x03\x0b\x00' in packet_bytes:
526                    nonlocal handle
527                    cc_view = hci_packets.ConnectionCompleteView(
528                        hci_packets.EventPacketView(
529                            bt_packets.PacketViewLittleEndian(
530                                list(packet_bytes))))
531                    handle = cc_view.GetConnectionHandle()
532                    return True
533                return False
534
535            connection_event_asserts.assert_event_occurs(get_handle)
536
537        self.cert_acl_handle = handle
538        return handle
539
540    def _open_channel(
541            self,
542            cert_acl_data_stream,
543            signal_id=1,
544            cert_acl_handle=0x1,
545            scid=0x0101,
546            psm=0x33,
547            mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC):
548        cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
549
550        self.device_under_test.l2cap.SetDynamicChannel(
551            l2cap_facade_pb2.SetEnableDynamicChannelRequest(
552                psm=psm, retransmission_mode=mode))
553        open_channel = l2cap_packets.ConnectionRequestBuilder(
554            signal_id, psm, scid)
555        open_channel_l2cap = l2cap_packets.BasicFrameBuilder(1, open_channel)
556        self.cert_send_b_frame(open_channel_l2cap)
557
558        def verify_connection_response(packet):
559            packet_bytes = packet.payload
560            l2cap_view = l2cap_packets.BasicFrameView(
561                bt_packets.PacketViewLittleEndian(list(packet_bytes)))
562            l2cap_control_view = l2cap_packets.ControlView(
563                l2cap_view.GetPayload())
564            if l2cap_control_view.GetCode(
565            ) != l2cap_packets.CommandCode.CONNECTION_RESPONSE:
566                return False
567            connection_response_view = l2cap_packets.ConnectionResponseView(
568                l2cap_control_view)
569            return connection_response_view.GetSourceCid(
570            ) == scid and connection_response_view.GetResult(
571            ) == l2cap_packets.ConnectionResponseResult.SUCCESS and connection_response_view.GetDestinationCid(
572            ) != 0
573
574        cert_acl_data_asserts.assert_event_occurs(verify_connection_response)
575
576    def test_connect_dynamic_channel_and_send_data(self):
577        cert_acl_handle = self._setup_link_from_cert()
578        with EventCallbackStream(
579                self.cert_device.hci_acl_manager.FetchAclData(
580                    empty_proto.Empty())) as cert_acl_data_stream:
581            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
582            cert_acl_data_stream.register_callback(self._handle_control_packet)
583            psm = 0x33
584            scid = 0x41
585            self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid,
586                               psm)
587            self.device_under_test.l2cap.SendDynamicChannelPacket(
588                l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
589            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
590            cert_acl_data_asserts.assert_event_occurs(
591                lambda packet: b'abc' in packet.payload)
592
593    def test_fixed_channel(self):
594        cert_acl_handle = self._setup_link_from_cert()
595        self.device_under_test.l2cap.RegisterChannel(
596            l2cap_facade_pb2.RegisterChannelRequest(channel=2))
597        asserts.skip("FIXME: Not working")
598        with EventCallbackStream(
599                self.cert_device.hci_acl_manager.FetchAclData(
600                    empty_proto.Empty())) as cert_acl_data_stream:
601            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
602            self.device_under_test.l2cap.SendL2capPacket(
603                l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123"))
604            cert_acl_data_asserts.assert_event_occurs(
605                lambda packet: b'123' in packet.payload)
606
607    def test_open_two_channels(self):
608        cert_acl_handle = self._setup_link_from_cert()
609        with EventCallbackStream(
610                self.cert_device.hci_acl_manager.FetchAclData(
611                    empty_proto.Empty())) as cert_acl_data_stream:
612            cert_acl_data_stream.register_callback(self._handle_control_packet)
613            self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, 0x41,
614                               0x41)
615            self._open_channel(cert_acl_data_stream, 2, cert_acl_handle, 0x43,
616                               0x43)
617
618    def test_connect_and_send_data_ertm_no_segmentation(self):
619        cert_acl_handle = self._setup_link_from_cert()
620        with EventCallbackStream(
621                self.cert_device.hci_acl_manager.FetchAclData(
622                    empty_proto.Empty())) as cert_acl_data_stream:
623            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
624            cert_acl_data_stream.register_callback(self._handle_control_packet)
625            self.on_connection_response = self._on_connection_response_use_ertm
626
627            psm = 0x33
628            scid = 0x41
629            self._open_channel(
630                cert_acl_data_stream,
631                1,
632                cert_acl_handle,
633                scid,
634                psm,
635                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
636
637            # FIXME: Order shouldn't matter here
638            cert_acl_data_asserts.assert_event_occurs(
639                self.is_correct_configuration_response)
640            cert_acl_data_asserts.assert_event_occurs(
641                self.is_correct_configuration_request)
642
643            dcid = self.scid_to_dcid[scid]
644
645            self.device_under_test.l2cap.SendDynamicChannelPacket(
646                l2cap_facade_pb2.DynamicChannelPacket(
647                    psm=psm, payload=b'abc' * 34))
648            cert_acl_data_asserts.assert_event_occurs(
649                lambda packet: b'abc' * 34 in packet.payload)
650
651            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
652                dcid, 0, l2cap_packets.Final.NOT_SET, 1,
653                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
654                SAMPLE_PACKET)
655            self.cert_send_b_frame(i_frame)
656
657    def test_basic_operation_request_connection(self):
658        """
659        L2CAP/COS/CED/BV-01-C [Request Connection]
660        Verify that the IUT is able to request the connection establishment for an L2CAP data channel and
661        initiate the configuration procedure.
662        """
663        cert_acl_handle = self._setup_link_from_cert()
664
665        with EventCallbackStream(
666                self.cert_device.hci_acl_manager.FetchAclData(
667                    empty_proto.Empty())) as cert_acl_data_stream:
668            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
669            cert_acl_data_stream.register_callback(self._handle_control_packet)
670            psm = 0x33
671            # TODO: Use another test case
672            self.device_under_test.l2cap.OpenChannel(
673                l2cap_facade_pb2.OpenChannelRequest(
674                    remote=self.cert_address, psm=psm))
675            cert_acl_data_asserts.assert_event_occurs(
676                self.is_correct_connection_request)
677
678    def test_accept_disconnect(self):
679        """
680        L2CAP/COS/CED/BV-07-C
681        """
682        cert_acl_handle = self._setup_link_from_cert()
683
684        with EventCallbackStream(
685                self.cert_device.hci_acl_manager.FetchAclData(
686                    empty_proto.Empty())) as cert_acl_data_stream:
687            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
688            cert_acl_data_stream.register_callback(self._handle_control_packet)
689
690            scid = 0x41
691            psm = 0x33
692            self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid,
693                               psm)
694
695            dcid = self.scid_to_dcid[scid]
696
697            close_channel = l2cap_packets.DisconnectionRequestBuilder(
698                1, dcid, scid)
699            close_channel_l2cap = l2cap_packets.BasicFrameBuilder(
700                1, close_channel)
701            self.cert_send_b_frame(close_channel_l2cap)
702
703            def verify_disconnection_response(packet):
704                packet_bytes = packet.payload
705                l2cap_view = l2cap_packets.BasicFrameView(
706                    bt_packets.PacketViewLittleEndian(list(packet_bytes)))
707                l2cap_control_view = l2cap_packets.ControlView(
708                    l2cap_view.GetPayload())
709                if l2cap_control_view.GetCode(
710                ) != l2cap_packets.CommandCode.DISCONNECTION_RESPONSE:
711                    return False
712                disconnection_response_view = l2cap_packets.DisconnectionResponseView(
713                    l2cap_control_view)
714                return disconnection_response_view.GetSourceCid(
715                ) == scid and disconnection_response_view.GetDestinationCid(
716                ) == dcid
717
718            cert_acl_data_asserts.assert_event_occurs(
719                verify_disconnection_response)
720
721    def test_disconnect_on_timeout(self):
722        """
723        L2CAP/COS/CED/BV-08-C
724        """
725        cert_acl_handle = self._setup_link_from_cert()
726
727        with EventCallbackStream(
728                self.cert_device.hci_acl_manager.FetchAclData(
729                    empty_proto.Empty())) as cert_acl_data_stream:
730            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
731            scid = 0x41
732            psm = 0x33
733            cert_acl_data_stream.register_callback(self._handle_control_packet)
734
735            # Don't send configuration request or response back
736            self.on_configuration_request = lambda _: True
737            self.on_connection_response = lambda _: True
738
739            self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid,
740                               psm)
741
742            def is_configuration_response(l2cap_packet):
743                packet_bytes = l2cap_packet.payload
744                l2cap_view = l2cap_packets.BasicFrameView(
745                    bt_packets.PacketViewLittleEndian(list(packet_bytes)))
746                if l2cap_view.GetChannelId() != 1:
747                    return False
748                l2cap_control_view = l2cap_packets.ControlView(
749                    l2cap_view.GetPayload())
750                return l2cap_control_view.GetCode(
751                ) == l2cap_packets.CommandCode.CONFIGURATION_RESPONSE
752
753            cert_acl_data_asserts.assert_none_matching(
754                is_configuration_response)
755
756    def test_respond_to_echo_request(self):
757        """
758        L2CAP/COS/ECH/BV-01-C [Respond to Echo Request]
759        Verify that the IUT responds to an echo request.
760        """
761        cert_acl_handle = self._setup_link_from_cert()
762        with EventCallbackStream(
763                self.cert_device.hci_acl_manager.FetchAclData(
764                    empty_proto.Empty())) as cert_acl_data_stream:
765            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
766            cert_acl_data_stream.register_callback(self._handle_control_packet)
767
768            echo_request = l2cap_packets.EchoRequestBuilder(
769                100, l2cap_packets.DisconnectionRequestBuilder(1, 2, 3))
770            echo_request_l2cap = l2cap_packets.BasicFrameBuilder(
771                1, echo_request)
772            self.cert_send_b_frame(echo_request_l2cap)
773
774            cert_acl_data_asserts.assert_event_occurs(
775                lambda packet: b"\x06\x01\x04\x00\x02\x00\x03\x00" in packet.payload
776            )
777
778    def test_reject_unknown_command(self):
779        """
780        L2CAP/COS/CED/BI-01-C
781        """
782        cert_acl_handle = self._setup_link_from_cert()
783        with EventCallbackStream(
784                self.cert_device.hci_acl_manager.FetchAclData(
785                    empty_proto.Empty())) as cert_acl_data_stream:
786            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
787            cert_acl_data_stream.register_callback(self._handle_control_packet)
788
789            invalid_command_packet = b"\x04\x00\x01\x00\xff\x01\x00\x00"
790            self.cert_device.hci_acl_manager.SendAclData(
791                acl_manager_facade.AclData(
792                    handle=cert_acl_handle,
793                    payload=bytes(invalid_command_packet)))
794
795            def is_command_reject(l2cap_packet):
796                packet_bytes = l2cap_packet.payload
797                l2cap_view = l2cap_packets.BasicFrameView(
798                    bt_packets.PacketViewLittleEndian(list(packet_bytes)))
799                if l2cap_view.GetChannelId() != 1:
800                    return False
801                l2cap_control_view = l2cap_packets.ControlView(
802                    l2cap_view.GetPayload())
803                return l2cap_control_view.GetCode(
804                ) == l2cap_packets.CommandCode.COMMAND_REJECT
805
806            cert_acl_data_asserts.assert_event_occurs(is_command_reject)
807
808    def test_query_for_1_2_features(self):
809        """
810        L2CAP/COS/IEX/BV-01-C [Query for 1.2 Features]
811        """
812        cert_acl_handle = self._setup_link_from_cert()
813        with EventCallbackStream(
814                self.cert_device.hci_acl_manager.FetchAclData(
815                    empty_proto.Empty())) as cert_acl_data_stream:
816            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
817            cert_acl_data_asserts_alt = EventAsserts(cert_acl_data_stream)
818            cert_acl_data_stream.register_callback(self._handle_control_packet)
819            signal_id = 3
820            information_request = l2cap_packets.InformationRequestBuilder(
821                signal_id, l2cap_packets.InformationRequestInfoType.
822                EXTENDED_FEATURES_SUPPORTED)
823            information_request_l2cap = l2cap_packets.BasicFrameBuilder(
824                1, information_request)
825            self.cert_send_b_frame(information_request_l2cap)
826
827            def is_correct_information_response(l2cap_packet):
828                packet_bytes = l2cap_packet.payload
829                l2cap_view = l2cap_packets.BasicFrameView(
830                    bt_packets.PacketViewLittleEndian(list(packet_bytes)))
831                if l2cap_view.GetChannelId() != 1:
832                    return False
833                l2cap_control_view = l2cap_packets.ControlView(
834                    l2cap_view.GetPayload())
835                if l2cap_control_view.GetCode(
836                ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
837                    return False
838                information_response_view = l2cap_packets.InformationResponseView(
839                    l2cap_control_view)
840                return information_response_view.GetInfoType(
841                ) == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED
842
843            cert_acl_data_asserts.assert_event_occurs(
844                is_correct_information_response)
845
846            def is_correct_information_request(l2cap_packet):
847                packet_bytes = l2cap_packet.payload
848                l2cap_view = l2cap_packets.BasicFrameView(
849                    bt_packets.PacketViewLittleEndian(list(packet_bytes)))
850                if l2cap_view.GetChannelId() != 1:
851                    return False
852                l2cap_control_view = l2cap_packets.ControlView(
853                    l2cap_view.GetPayload())
854                if l2cap_control_view.GetCode(
855                ) != l2cap_packets.CommandCode.INFORMATION_REQUEST:
856                    return False
857                information_request_view = l2cap_packets.InformationRequestView(
858                    l2cap_control_view)
859                return information_request_view.GetInfoType(
860                ) == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED
861
862            cert_acl_data_asserts_alt.assert_event_occurs(
863                is_correct_information_request)
864
865    def test_extended_feature_info_response_ertm(self):
866        """
867        L2CAP/EXF/BV-01-C [Extended Features Information Response for Enhanced
868        Retransmission Mode]
869        """
870        cert_acl_handle = self._setup_link_from_cert()
871        with EventCallbackStream(
872                self.cert_device.hci_acl_manager.FetchAclData(
873                    empty_proto.Empty())) as cert_acl_data_stream:
874            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
875            cert_acl_data_stream.register_callback(self._handle_control_packet)
876
877            signal_id = 3
878            information_request = l2cap_packets.InformationRequestBuilder(
879                signal_id, l2cap_packets.InformationRequestInfoType.
880                EXTENDED_FEATURES_SUPPORTED)
881            information_request_l2cap = l2cap_packets.BasicFrameBuilder(
882                1, information_request)
883            self.cert_send_b_frame(information_request_l2cap)
884
885            def is_correct_information_response(l2cap_packet):
886                packet_bytes = l2cap_packet.payload
887                l2cap_view = l2cap_packets.BasicFrameView(
888                    bt_packets.PacketViewLittleEndian(list(packet_bytes)))
889                if l2cap_view.GetChannelId() != 1:
890                    return False
891                l2cap_control_view = l2cap_packets.ControlView(
892                    l2cap_view.GetPayload())
893                if l2cap_control_view.GetCode(
894                ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
895                    return False
896                information_response_view = l2cap_packets.InformationResponseView(
897                    l2cap_control_view)
898                if information_response_view.GetInfoType(
899                ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
900                    return False
901                extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView(
902                    information_response_view)
903                return extended_features_view.GetEnhancedRetransmissionMode()
904
905            cert_acl_data_asserts.assert_event_occurs(
906                is_correct_information_response)
907
908    def test_extended_feature_info_response_fcs(self):
909        """
910        L2CAP/EXF/BV-03-C [Extended Features Information Response for FCS Option]
911        Note: This is not mandated by L2CAP Spec
912        """
913        cert_acl_handle = self._setup_link_from_cert()
914        with EventCallbackStream(
915                self.cert_device.hci_acl_manager.FetchAclData(
916                    empty_proto.Empty())) as cert_acl_data_stream:
917            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
918            cert_acl_data_stream.register_callback(self._handle_control_packet)
919
920            signal_id = 3
921            information_request = l2cap_packets.InformationRequestBuilder(
922                signal_id, l2cap_packets.InformationRequestInfoType.
923                EXTENDED_FEATURES_SUPPORTED)
924            information_request_l2cap = l2cap_packets.BasicFrameBuilder(
925                1, information_request)
926            self.cert_send_b_frame(information_request_l2cap)
927
928            def is_correct_information_response(l2cap_packet):
929                packet_bytes = l2cap_packet.payload
930                l2cap_view = l2cap_packets.BasicFrameView(
931                    bt_packets.PacketViewLittleEndian(list(packet_bytes)))
932                if l2cap_view.GetChannelId() != 1:
933                    return False
934                l2cap_control_view = l2cap_packets.ControlView(
935                    l2cap_view.GetPayload())
936                if l2cap_control_view.GetCode(
937                ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
938                    return False
939                information_response_view = l2cap_packets.InformationResponseView(
940                    l2cap_control_view)
941                if information_response_view.GetInfoType(
942                ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
943                    return False
944                extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView(
945                    information_response_view)
946                return extended_features_view.GetFcsOption()
947
948            cert_acl_data_asserts.assert_event_occurs(
949                is_correct_information_response)
950
951    def test_config_channel_not_use_FCS(self):
952        """
953        L2CAP/FOC/BV-01-C [IUT Initiated Configuration of the FCS Option]
954        Verify the IUT can configure a channel to not use FCS in I/S-frames.
955        """
956        cert_acl_handle = self._setup_link_from_cert()
957        with EventCallbackStream(
958                self.cert_device.hci_acl_manager.FetchAclData(
959                    empty_proto.Empty())) as cert_acl_data_stream:
960            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
961            cert_acl_data_stream.register_callback(self._handle_control_packet)
962
963            self.on_connection_response = self._on_connection_response_use_ertm
964
965            psm = 0x33
966            scid = 0x41
967            self._open_channel(
968                cert_acl_data_stream,
969                1,
970                cert_acl_handle,
971                scid,
972                psm,
973                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
974            # FIXME: Order shouldn't matter here
975            cert_acl_data_asserts.assert_event_occurs(
976                self.is_correct_configuration_response)
977            cert_acl_data_asserts.assert_event_occurs(
978                self.is_correct_configuration_request)
979
980            self.device_under_test.l2cap.SendDynamicChannelPacket(
981                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
982            cert_acl_data_asserts.assert_event_occurs(
983                lambda packet: b"abc" in packet.payload)
984
985    def test_explicitly_request_use_FCS(self):
986        """
987        L2CAP/FOC/BV-02-C [Lower Tester Explicitly Requests FCS should be Used]
988        Verify the IUT will include the FCS in I/S-frames if the Lower Tester explicitly requests that FCS
989        should be used.
990        """
991
992        cert_acl_handle = self._setup_link_from_cert()
993        with EventCallbackStream(
994                self.cert_device.hci_acl_manager.FetchAclData(
995                    empty_proto.Empty())) as cert_acl_data_stream:
996            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
997            cert_acl_data_stream.register_callback(self._handle_control_packet)
998
999            self.on_connection_response = self._on_connection_response_use_ertm_and_fcs
1000            psm = 0x33
1001            scid = 0x41
1002            self._open_channel(
1003                cert_acl_data_stream,
1004                1,
1005                cert_acl_handle,
1006                scid,
1007                psm,
1008                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1009            # FIXME: Order shouldn't matter here
1010            cert_acl_data_asserts.assert_event_occurs(
1011                self.is_correct_configuration_response)
1012            cert_acl_data_asserts.assert_event_occurs(
1013                self.is_correct_configuration_request)
1014
1015            self.device_under_test.l2cap.SendDynamicChannelPacket(
1016                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1017            cert_acl_data_asserts.assert_event_occurs(
1018                lambda packet: b"abc\x4f\xa3" in packet.payload
1019            )  # TODO: Use packet parser
1020
1021    def test_implicitly_request_use_FCS(self):
1022        """
1023        L2CAP/FOC/BV-03-C [Lower Tester Implicitly Requests FCS should be Used]
1024        TODO: Update this test case. What's the difference between this one and test_explicitly_request_use_FCS?
1025        """
1026        cert_acl_handle = self._setup_link_from_cert()
1027        with EventCallbackStream(
1028                self.cert_device.hci_acl_manager.FetchAclData(
1029                    empty_proto.Empty())) as cert_acl_data_stream:
1030            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1031            cert_acl_data_stream.register_callback(self._handle_control_packet)
1032
1033            self.on_connection_response = self._on_connection_response_use_ertm_and_fcs
1034            psm = 0x33
1035            scid = 0x41
1036            self._open_channel(
1037                cert_acl_data_stream,
1038                1,
1039                cert_acl_handle,
1040                scid,
1041                psm,
1042                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1043            # FIXME: Order shouldn't matter here
1044            cert_acl_data_asserts.assert_event_occurs(
1045                self.is_correct_configuration_response)
1046            cert_acl_data_asserts.assert_event_occurs(
1047                self.is_correct_configuration_request)
1048
1049            self.device_under_test.l2cap.SendDynamicChannelPacket(
1050                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1051            cert_acl_data_asserts.assert_event_occurs(
1052                lambda packet: b"abc\x4f\xa3" in packet.payload
1053            )  # TODO: Use packet parser
1054
1055    def test_transmit_i_frames(self):
1056        """
1057        L2CAP/ERM/BV-01-C [Transmit I-frames]
1058        """
1059        cert_acl_handle = self._setup_link_from_cert()
1060        with EventCallbackStream(
1061                self.cert_device.hci_acl_manager.FetchAclData(
1062                    empty_proto.Empty())) as cert_acl_data_stream:
1063            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1064            cert_acl_data_stream.register_callback(self._handle_control_packet)
1065
1066            self.on_connection_response = self._on_connection_response_use_ertm
1067            psm = 0x33
1068            scid = 0x41
1069            self._open_channel(
1070                cert_acl_data_stream,
1071                1,
1072                cert_acl_handle,
1073                scid,
1074                psm,
1075                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1076
1077            dcid = self.scid_to_dcid[scid]
1078
1079            # FIXME: Order shouldn't matter here
1080            cert_acl_data_asserts.assert_event_occurs(
1081                self.is_correct_configuration_response)
1082            cert_acl_data_asserts.assert_event_occurs(
1083                self.is_correct_configuration_request)
1084
1085            self.device_under_test.l2cap.SendDynamicChannelPacket(
1086                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1087            cert_acl_data_asserts.assert_event_occurs(
1088                lambda packet: b"abc" in packet.payload)
1089
1090            # Assemble a sample packet. TODO: Use RawBuilder
1091            SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1)
1092
1093            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
1094                dcid, 0, l2cap_packets.Final.NOT_SET, 1,
1095                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
1096                SAMPLE_PACKET)
1097            self.cert_send_b_frame(i_frame)
1098
1099            self.device_under_test.l2cap.SendDynamicChannelPacket(
1100                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1101            cert_acl_data_asserts.assert_event_occurs(
1102                lambda packet: b"abc" in packet.payload)
1103
1104            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
1105                dcid, 1, l2cap_packets.Final.NOT_SET, 2,
1106                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
1107                SAMPLE_PACKET)
1108            self.cert_send_b_frame(i_frame)
1109
1110            self.device_under_test.l2cap.SendDynamicChannelPacket(
1111                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1112            cert_acl_data_asserts.assert_event_occurs(
1113                lambda packet: b"abc" in packet.payload)
1114
1115            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
1116                dcid, 2, l2cap_packets.Final.NOT_SET, 3,
1117                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
1118                SAMPLE_PACKET)
1119            self.cert_send_b_frame(i_frame)
1120
1121    def test_receive_i_frames(self):
1122        """
1123        L2CAP/ERM/BV-02-C [Receive I-Frames]
1124        Verify the IUT can receive in-sequence valid I-frames and deliver L2CAP SDUs to the Upper Tester
1125        """
1126        cert_acl_handle = self._setup_link_from_cert()
1127        with EventCallbackStream(
1128                self.cert_device.hci_acl_manager.FetchAclData(
1129                    empty_proto.Empty())) as cert_acl_data_stream:
1130            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1131            cert_acl_data_stream.register_callback(self._handle_control_packet)
1132            self.on_connection_response = self._on_connection_response_use_ertm
1133
1134            psm = 0x33
1135            scid = 0x41
1136            self._open_channel(
1137                cert_acl_data_stream,
1138                1,
1139                cert_acl_handle,
1140                scid,
1141                psm,
1142                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1143
1144            dcid = self.scid_to_dcid[scid]
1145
1146            # FIXME: Order shouldn't matter here
1147            cert_acl_data_asserts.assert_event_occurs(
1148                self.is_correct_configuration_response)
1149            cert_acl_data_asserts.assert_event_occurs(
1150                self.is_correct_configuration_request)
1151
1152            for i in range(3):
1153                i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
1154                    dcid, i, l2cap_packets.Final.NOT_SET, 0,
1155                    l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
1156                    SAMPLE_PACKET)
1157                self.cert_send_b_frame(i_frame)
1158                cert_acl_data_asserts.assert_event_occurs(
1159                    lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
1160                )
1161
1162            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
1163                dcid, 3, l2cap_packets.Final.NOT_SET, 0,
1164                l2cap_packets.SegmentationAndReassembly.START, SAMPLE_PACKET)
1165            self.cert_send_b_frame(i_frame)
1166            cert_acl_data_asserts.assert_event_occurs(
1167                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4
1168            )
1169
1170            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
1171                dcid, 4, l2cap_packets.Final.NOT_SET, 0,
1172                l2cap_packets.SegmentationAndReassembly.CONTINUATION,
1173                SAMPLE_PACKET)
1174            self.cert_send_b_frame(i_frame)
1175            cert_acl_data_asserts.assert_event_occurs(
1176                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 5
1177            )
1178
1179            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
1180                dcid, 5, l2cap_packets.Final.NOT_SET, 0,
1181                l2cap_packets.SegmentationAndReassembly.END, SAMPLE_PACKET)
1182            self.cert_send_b_frame(i_frame)
1183            cert_acl_data_asserts.assert_event_occurs(
1184                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 6
1185            )
1186
1187    def test_acknowledging_received_i_frames(self):
1188        """
1189        L2CAP/ERM/BV-03-C [Acknowledging Received I-Frames]
1190        Verify the IUT sends S-frame [RR] with the Poll bit not set to acknowledge data received from the
1191        Lower Tester
1192        """
1193        cert_acl_handle = self._setup_link_from_cert()
1194        with EventCallbackStream(
1195                self.cert_device.hci_acl_manager.FetchAclData(
1196                    empty_proto.Empty())) as cert_acl_data_stream:
1197            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1198            cert_acl_data_stream.register_callback(self._handle_control_packet)
1199            self.on_connection_response = self._on_connection_response_use_ertm
1200
1201            psm = 0x33
1202            scid = 0x41
1203            self._open_channel(
1204                cert_acl_data_stream,
1205                1,
1206                cert_acl_handle,
1207                scid,
1208                psm,
1209                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1210
1211            dcid = self.scid_to_dcid[scid]
1212
1213            # FIXME: Order shouldn't matter here
1214            cert_acl_data_asserts.assert_event_occurs(
1215                self.is_correct_configuration_response)
1216            cert_acl_data_asserts.assert_event_occurs(
1217                self.is_correct_configuration_request)
1218
1219            for i in range(3):
1220                i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
1221                    dcid, i, l2cap_packets.Final.NOT_SET, 0,
1222                    l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
1223                    SAMPLE_PACKET)
1224                self.cert_send_b_frame(i_frame)
1225                cert_acl_data_asserts.assert_event_occurs(
1226                    lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
1227                )
1228
1229            cert_acl_data_asserts.assert_none_matching(
1230                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4,
1231                timedelta(seconds=1))
1232
1233    def test_resume_transmitting_when_received_rr(self):
1234        """
1235        L2CAP/ERM/BV-05-C [Resume Transmitting I-Frames when an S-Frame [RR] is Received]
1236        Verify the IUT will cease transmission of I-frames when the negotiated TxWindow is full. Verify the
1237        IUT will resume transmission of I-frames when an S-frame [RR] is received that acknowledges
1238        previously sent I-frames.
1239        """
1240        self.ertm_tx_window_size = 1
1241        cert_acl_handle = self._setup_link_from_cert()
1242        with EventCallbackStream(
1243                self.cert_device.hci_acl_manager.FetchAclData(
1244                    empty_proto.Empty())) as cert_acl_data_stream:
1245            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1246            cert_acl_data_stream.register_callback(self._handle_control_packet)
1247            self.on_connection_response = self._on_connection_response_use_ertm
1248
1249            psm = 0x33
1250            scid = 0x41
1251            self._open_channel(
1252                cert_acl_data_stream,
1253                1,
1254                cert_acl_handle,
1255                scid,
1256                psm,
1257                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1258
1259            dcid = self.scid_to_dcid[scid]
1260
1261            # FIXME: Order shouldn't matter here
1262            cert_acl_data_asserts.assert_event_occurs(
1263                self.is_correct_configuration_response)
1264            cert_acl_data_asserts.assert_event_occurs(
1265                self.is_correct_configuration_request)
1266
1267            self.device_under_test.l2cap.SendDynamicChannelPacket(
1268                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1269            self.device_under_test.l2cap.SendDynamicChannelPacket(
1270                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'def'))
1271
1272            # TODO: Besides checking TxSeq, we also want to check payload, once we can get it from packet view
1273            cert_acl_data_asserts.assert_event_occurs(
1274                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
1275            )
1276            cert_acl_data_asserts.assert_none_matching(
1277                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
1278            )
1279            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
1280                dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
1281                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
1282                1)
1283            self.cert_send_b_frame(s_frame)
1284            cert_acl_data_asserts.assert_event_occurs(
1285                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1
1286            )
1287
1288    def test_resume_transmitting_when_acknowledge_previously_sent(self):
1289        """
1290        L2CAP/ERM/BV-06-C [Resume Transmitting I-Frames when an I-Frame is Received]
1291        Verify the IUT will cease transmission of I-frames when the negotiated TxWindow is full. Verify the
1292        IUT will resume transmission of I-frames when an I-frame is received that acknowledges previously
1293        sent I-frames.
1294        """
1295        self.ertm_tx_window_size = 1
1296        cert_acl_handle = self._setup_link_from_cert()
1297        with EventCallbackStream(
1298                self.cert_device.hci_acl_manager.FetchAclData(
1299                    empty_proto.Empty())) as cert_acl_data_stream:
1300            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1301            cert_acl_data_stream.register_callback(self._handle_control_packet)
1302            self.on_connection_response = self._on_connection_response_use_ertm
1303
1304            psm = 0x33
1305            scid = 0x41
1306            self._open_channel(
1307                cert_acl_data_stream,
1308                1,
1309                cert_acl_handle,
1310                scid,
1311                psm,
1312                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1313
1314            dcid = self.scid_to_dcid[scid]
1315
1316            # FIXME: Order shouldn't matter here
1317            cert_acl_data_asserts.assert_event_occurs(
1318                self.is_correct_configuration_response)
1319            cert_acl_data_asserts.assert_event_occurs(
1320                self.is_correct_configuration_request)
1321
1322            self.device_under_test.l2cap.SendDynamicChannelPacket(
1323                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1324            self.device_under_test.l2cap.SendDynamicChannelPacket(
1325                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'def'))
1326
1327            cert_acl_data_asserts.assert_event_occurs(
1328                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
1329            )
1330            # TODO: If 1 second is greater than their retransmit timeout, use a smaller timeout
1331            cert_acl_data_asserts.assert_none_matching(
1332                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
1333                timedelta(seconds=1))
1334
1335            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
1336                dcid, 0, l2cap_packets.Final.NOT_SET, 1,
1337                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
1338                SAMPLE_PACKET)
1339            self.cert_send_b_frame(i_frame)
1340
1341            cert_acl_data_asserts.assert_event_occurs(
1342                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1
1343            )
1344
1345            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
1346                dcid, 1, l2cap_packets.Final.NOT_SET, 2,
1347                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
1348                SAMPLE_PACKET)
1349            self.cert_send_b_frame(i_frame)
1350
1351    def test_transmit_s_frame_rr_with_poll_bit_set(self):
1352        """
1353        L2CAP/ERM/BV-08-C [Send S-Frame [RR] with Poll Bit Set]
1354        Verify the IUT sends an S-frame [RR] with the Poll bit set when its retransmission timer expires.
1355        """
1356        cert_acl_handle = self._setup_link_from_cert()
1357        with EventCallbackStream(
1358                self.cert_device.hci_acl_manager.FetchAclData(
1359                    empty_proto.Empty())) as cert_acl_data_stream:
1360            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1361            cert_acl_data_stream.register_callback(self._handle_control_packet)
1362            self.on_connection_response = self._on_connection_response_use_ertm
1363
1364            psm = 0x33
1365            scid = 0x41
1366            self._open_channel(
1367                cert_acl_data_stream,
1368                1,
1369                cert_acl_handle,
1370                scid,
1371                psm,
1372                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1373
1374            # FIXME: Order shouldn't matter here
1375            cert_acl_data_asserts.assert_event_occurs(
1376                self.is_correct_configuration_response)
1377            cert_acl_data_asserts.assert_event_occurs(
1378                self.is_correct_configuration_request)
1379
1380            self.device_under_test.l2cap.SendDynamicChannelPacket(
1381                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1382            # TODO: Always use their retransmission timeout value
1383            time.sleep(2)
1384            cert_acl_data_asserts.assert_event_occurs(
1385                lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
1386            )
1387
1388    def test_transmit_s_frame_rr_with_final_bit_set(self):
1389        """
1390        L2CAP/ERM/BV-09-C [Send S-Frame [RR] with Final Bit Set]
1391        Verify the IUT responds with an S-frame [RR] with the Final bit set after receiving an S-frame [RR]
1392        with the Poll bit set.
1393        """
1394        cert_acl_handle = self._setup_link_from_cert()
1395        with EventCallbackStream(
1396                self.cert_device.hci_acl_manager.FetchAclData(
1397                    empty_proto.Empty())) as cert_acl_data_stream:
1398            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1399            cert_acl_data_stream.register_callback(self._handle_control_packet)
1400            self.on_connection_response = self._on_connection_response_use_ertm
1401
1402            psm = 0x33
1403            scid = 0x41
1404            self._open_channel(
1405                cert_acl_data_stream,
1406                1,
1407                cert_acl_handle,
1408                scid,
1409                psm,
1410                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1411
1412            # FIXME: Order shouldn't matter here
1413            cert_acl_data_asserts.assert_event_occurs(
1414                self.is_correct_configuration_response)
1415            cert_acl_data_asserts.assert_event_occurs(
1416                self.is_correct_configuration_request)
1417
1418            dcid = self.scid_to_dcid[scid]
1419
1420            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
1421                dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
1422                l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0)
1423            self.cert_send_b_frame(s_frame)
1424
1425            cert_acl_data_asserts.assert_event_occurs(
1426                lambda packet: self.get_f_from_ertm_s_frame(scid, packet) == l2cap_packets.Final.POLL_RESPONSE
1427            )
1428
1429    def test_s_frame_transmissions_exceed_max_transmit(self):
1430        """
1431        L2CAP/ERM/BV-11-C [S-Frame Transmissions Exceed MaxTransmit]
1432        Verify the IUT will close the channel when the Monitor Timer expires.
1433        """
1434        asserts.skip("Need to configure DUT to have a shorter timer")
1435        cert_acl_handle = self._setup_link_from_cert()
1436        with EventCallbackStream(
1437                self.cert_device.hci_acl_manager.FetchAclData(
1438                    empty_proto.Empty())) as cert_acl_data_stream:
1439            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1440            cert_acl_data_stream.register_callback(self._handle_control_packet)
1441            self.on_connection_response = self._on_connection_response_use_ertm
1442
1443            psm = 0x33
1444            scid = 0x41
1445            self._open_channel(
1446                cert_acl_data_stream,
1447                1,
1448                cert_acl_handle,
1449                scid,
1450                psm,
1451                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1452
1453            # FIXME: Order shouldn't matter here
1454            cert_acl_data_asserts.assert_event_occurs(
1455                self.is_correct_configuration_response)
1456            cert_acl_data_asserts.assert_event_occurs(
1457                self.is_correct_configuration_request)
1458
1459            dcid = self.scid_to_dcid[scid]
1460
1461            self.device_under_test.l2cap.SendDynamicChannelPacket(
1462                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1463
1464            # Retransmission timer = 2, 20 * monitor timer = 360, so total timeout is 362
1465            time.sleep(362)
1466            cert_acl_data_asserts.assert_event_occurs(
1467                self.is_correct_disconnection_request)
1468
1469    def test_i_frame_transmissions_exceed_max_transmit(self):
1470        """
1471        L2CAP/ERM/BV-12-C [I-Frame Transmissions Exceed MaxTransmit]
1472        Verify the IUT will close the channel when it receives an S-frame [RR] with the final bit set that does
1473        not acknowledge the previous I-frame sent by the IUT.
1474        """
1475        cert_acl_handle = self._setup_link_from_cert()
1476        with EventCallbackStream(
1477                self.cert_device.hci_acl_manager.FetchAclData(
1478                    empty_proto.Empty())) as cert_acl_data_stream:
1479            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1480            cert_acl_data_stream.register_callback(self._handle_control_packet)
1481            self.on_connection_response = self._on_connection_response_use_ertm
1482
1483            psm = 0x33
1484            scid = 0x41
1485            self._open_channel(
1486                cert_acl_data_stream,
1487                1,
1488                cert_acl_handle,
1489                scid,
1490                psm,
1491                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1492
1493            # FIXME: Order shouldn't matter here
1494            cert_acl_data_asserts.assert_event_occurs(
1495                self.is_correct_configuration_response)
1496            cert_acl_data_asserts.assert_event_occurs(
1497                self.is_correct_configuration_request)
1498
1499            dcid = self.scid_to_dcid[scid]
1500
1501            self.device_under_test.l2cap.SendDynamicChannelPacket(
1502                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1503
1504            cert_acl_data_asserts.assert_event_occurs(
1505                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
1506            )
1507
1508            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
1509                dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
1510                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
1511                0)
1512            self.cert_send_b_frame(s_frame)
1513
1514            cert_acl_data_asserts.assert_event_occurs(
1515                self.is_correct_disconnection_request)
1516
1517    def test_respond_to_rej(self):
1518        """
1519        L2CAP/ERM/BV-13-C [Respond to S-Frame [REJ]]
1520        Verify the IUT retransmits I-frames starting from the sequence number specified in the S-frame [REJ].
1521        """
1522        self.ertm_tx_window_size = 2
1523        self.ertm_max_transmit = 2
1524        cert_acl_handle = self._setup_link_from_cert()
1525        with EventCallbackStream(
1526                self.cert_device.hci_acl_manager.FetchAclData(
1527                    empty_proto.Empty())) as cert_acl_data_stream:
1528            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1529            cert_acl_data_stream.register_callback(self._handle_control_packet)
1530            self.on_connection_response = self._on_connection_response_use_ertm
1531
1532            psm = 0x33
1533            scid = 0x41
1534            self._open_channel(
1535                cert_acl_data_stream,
1536                1,
1537                cert_acl_handle,
1538                scid,
1539                psm,
1540                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1541
1542            # FIXME: Order shouldn't matter here
1543            cert_acl_data_asserts.assert_event_occurs(
1544                self.is_correct_configuration_response)
1545            cert_acl_data_asserts.assert_event_occurs(
1546                self.is_correct_configuration_request)
1547
1548            dcid = self.scid_to_dcid[scid]
1549
1550            self.device_under_test.l2cap.SendDynamicChannelPacket(
1551                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1552            self.device_under_test.l2cap.SendDynamicChannelPacket(
1553                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1554            for i in range(2):
1555                cert_acl_data_asserts.assert_event_occurs(
1556                    lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == i,
1557                    timeout=timedelta(seconds=0.5))
1558
1559            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
1560                dcid, l2cap_packets.SupervisoryFunction.REJECT,
1561                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
1562            self.cert_send_b_frame(s_frame)
1563
1564            for i in range(2):
1565                cert_acl_data_asserts.assert_event_occurs(
1566                    lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == i,
1567                    timeout=timedelta(seconds=0.5))
1568
1569    def test_receive_s_frame_rr_final_bit_set(self):
1570        """
1571        L2CAP/ERM/BV-18-C [Receive S-Frame [RR] Final Bit = 1]
1572        Verify the IUT will retransmit any previously sent I-frames unacknowledged by receipt of an S-Frame
1573        [RR] with the Final Bit set.
1574        """
1575        cert_acl_handle = self._setup_link_from_cert()
1576        with EventCallbackStream(
1577                self.cert_device.hci_acl_manager.FetchAclData(
1578                    empty_proto.Empty())) as cert_acl_data_stream:
1579            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1580            cert_acl_data_stream.register_callback(self._handle_control_packet)
1581            self.on_connection_response = self._on_connection_response_use_ertm
1582
1583            psm = 0x33
1584            scid = 0x41
1585            self._open_channel(
1586                cert_acl_data_stream,
1587                1,
1588                cert_acl_handle,
1589                scid,
1590                psm,
1591                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1592
1593            # FIXME: Order shouldn't matter here
1594            cert_acl_data_asserts.assert_event_occurs(
1595                self.is_correct_configuration_response)
1596            cert_acl_data_asserts.assert_event_occurs(
1597                self.is_correct_configuration_request)
1598
1599            dcid = self.scid_to_dcid[scid]
1600
1601            self.device_under_test.l2cap.SendDynamicChannelPacket(
1602                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1603
1604            # TODO: Always use their retransmission timeout value
1605            time.sleep(2)
1606            cert_acl_data_asserts.assert_event_occurs(
1607                lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
1608            )
1609
1610            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
1611                dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
1612                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
1613                0)
1614            self.cert_send_b_frame(s_frame)
1615
1616            cert_acl_data_asserts.assert_event_occurs(
1617                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
1618            )
1619
1620    def test_receive_i_frame_final_bit_set(self):
1621        """
1622        L2CAP/ERM/BV-19-C [Receive I-Frame Final Bit = 1]
1623        Verify the IUT will retransmit any previously sent I-frames unacknowledged by receipt of an I-frame
1624        with the final bit set.
1625        """
1626        cert_acl_handle = self._setup_link_from_cert()
1627        with EventCallbackStream(
1628                self.cert_device.hci_acl_manager.FetchAclData(
1629                    empty_proto.Empty())) as cert_acl_data_stream:
1630            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1631            cert_acl_data_stream.register_callback(self._handle_control_packet)
1632            self.on_connection_response = self._on_connection_response_use_ertm
1633
1634            psm = 0x33
1635            scid = 0x41
1636            self._open_channel(
1637                cert_acl_data_stream,
1638                1,
1639                cert_acl_handle,
1640                scid,
1641                psm,
1642                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1643
1644            # FIXME: Order shouldn't matter here
1645            cert_acl_data_asserts.assert_event_occurs(
1646                self.is_correct_configuration_response)
1647            cert_acl_data_asserts.assert_event_occurs(
1648                self.is_correct_configuration_request)
1649
1650            dcid = self.scid_to_dcid[scid]
1651
1652            self.device_under_test.l2cap.SendDynamicChannelPacket(
1653                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1654
1655            # TODO: Always use their retransmission timeout value
1656            time.sleep(2)
1657            cert_acl_data_asserts.assert_event_occurs(
1658                lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
1659            )
1660
1661            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
1662                dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0,
1663                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
1664                SAMPLE_PACKET)
1665            self.cert_send_b_frame(i_frame)
1666
1667            cert_acl_data_asserts.assert_event_occurs(
1668                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
1669            )
1670
1671    def test_recieve_rnr(self):
1672        """
1673        L2CAP/ERM/BV-20-C [Enter Remote Busy Condition]
1674        Verify the IUT will not retransmit any I-frames when it receives a remote busy indication from the
1675        Lower Tester (S-frame [RNR]).
1676        """
1677        cert_acl_handle = self._setup_link_from_cert()
1678        with EventCallbackStream(
1679                self.cert_device.hci_acl_manager.FetchAclData(
1680                    empty_proto.Empty())) as cert_acl_data_stream:
1681            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1682            cert_acl_data_stream.register_callback(self._handle_control_packet)
1683            self.on_connection_response = self._on_connection_response_use_ertm
1684
1685            psm = 0x33
1686            scid = 0x41
1687            self._open_channel(
1688                cert_acl_data_stream,
1689                1,
1690                cert_acl_handle,
1691                scid,
1692                psm,
1693                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1694
1695            # FIXME: Order shouldn't matter here
1696            cert_acl_data_asserts.assert_event_occurs(
1697                self.is_correct_configuration_response)
1698            cert_acl_data_asserts.assert_event_occurs(
1699                self.is_correct_configuration_request)
1700
1701            dcid = self.scid_to_dcid[scid]
1702
1703            self.device_under_test.l2cap.SendDynamicChannelPacket(
1704                l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
1705
1706            # TODO: Always use their retransmission timeout value
1707            time.sleep(2)
1708            cert_acl_data_asserts.assert_event_occurs(
1709                lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
1710            )
1711
1712            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
1713                dcid, l2cap_packets.SupervisoryFunction.RECEIVER_NOT_READY,
1714                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
1715                0)
1716            self.cert_send_b_frame(s_frame)
1717
1718            cert_acl_data_asserts.assert_none_matching(
1719                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
1720            )
1721
1722    def test_sent_rej_lost(self):
1723        """
1724        L2CAP/ERM/BI-01-C [S-Frame [REJ] Lost or Corrupted]
1725        Verify the IUT can handle receipt of an S-=frame [RR] Poll = 1 if the S-frame [REJ] sent from the IUT
1726        is lost.
1727        """
1728        self.ertm_tx_window_size = 5
1729        cert_acl_handle = self._setup_link_from_cert()
1730        with EventCallbackStream(
1731                self.cert_device.hci_acl_manager.FetchAclData(
1732                    empty_proto.Empty())) as cert_acl_data_stream:
1733            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1734            cert_acl_data_stream.register_callback(self._handle_control_packet)
1735            self.on_connection_response = self._on_connection_response_use_ertm
1736
1737            psm = 0x33
1738            scid = 0x41
1739            self._open_channel(
1740                cert_acl_data_stream,
1741                1,
1742                cert_acl_handle,
1743                scid,
1744                psm,
1745                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1746
1747            # FIXME: Order shouldn't matter here
1748            cert_acl_data_asserts.assert_event_occurs(
1749                self.is_correct_configuration_response)
1750            cert_acl_data_asserts.assert_event_occurs(
1751                self.is_correct_configuration_request)
1752
1753            dcid = self.scid_to_dcid[scid]
1754
1755            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
1756                dcid, 0, l2cap_packets.Final.NOT_SET, 0,
1757                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
1758                SAMPLE_PACKET)
1759            self.cert_send_b_frame(i_frame)
1760            cert_acl_data_asserts.assert_event_occurs(
1761                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 1
1762            )
1763
1764            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
1765                dcid, self.ertm_tx_window_size - 1, l2cap_packets.Final.NOT_SET,
1766                0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
1767                SAMPLE_PACKET)
1768            self.cert_send_b_frame(i_frame)
1769            cert_acl_data_asserts.assert_event_occurs(
1770                lambda packet: self.get_s_from_ertm_s_frame(scid, packet) == l2cap_packets.SupervisoryFunction.REJECT
1771            )
1772
1773            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
1774                dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
1775                l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0)
1776            self.cert_send_b_frame(s_frame)
1777
1778            cert_acl_data_asserts.assert_event_occurs(
1779                lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 1 and self.get_f_from_ertm_s_frame(scid, packet) == l2cap_packets.Final.POLL_RESPONSE)
1780            for i in range(1, self.ertm_tx_window_size):
1781                i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
1782                    dcid, i, l2cap_packets.Final.NOT_SET, 0,
1783                    l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
1784                    SAMPLE_PACKET)
1785                self.cert_send_b_frame(i_frame)
1786                cert_acl_data_asserts.assert_event_occurs(
1787                    lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
1788                )
1789
1790    def test_handle_duplicate_srej(self):
1791        """
1792        L2CAP/ERM/BI-03-C [Handle Duplicate S-Frame [SREJ]]
1793        Verify the IUT will only retransmit the requested I-frame once after receiving a duplicate SREJ.
1794        """
1795        cert_acl_handle = self._setup_link_from_cert()
1796        with EventCallbackStream(
1797                self.cert_device.hci_acl_manager.FetchAclData(
1798                    empty_proto.Empty())) as cert_acl_data_stream:
1799            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1800            cert_acl_data_stream.register_callback(self._handle_control_packet)
1801            self.on_connection_response = self._on_connection_response_use_ertm
1802
1803            psm = 0x33
1804            scid = 0x41
1805            self._open_channel(
1806                cert_acl_data_stream,
1807                1,
1808                cert_acl_handle,
1809                scid,
1810                psm,
1811                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1812
1813            # FIXME: Order shouldn't matter here
1814            cert_acl_data_asserts.assert_event_occurs(
1815                self.is_correct_configuration_response)
1816            cert_acl_data_asserts.assert_event_occurs(
1817                self.is_correct_configuration_request)
1818
1819            dcid = self.scid_to_dcid[scid]
1820
1821            self.device_under_test.l2cap.SendDynamicChannelPacket(
1822                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1823            self.device_under_test.l2cap.SendDynamicChannelPacket(
1824                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1825            cert_acl_data_asserts.assert_event_occurs(
1826                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
1827                timeout=timedelta(0.5))
1828            cert_acl_data_asserts.assert_event_occurs(
1829                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
1830                timeout=timedelta(0.5))
1831            cert_acl_data_asserts.assert_event_occurs(
1832                lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
1833            )
1834
1835            # Send SREJ with F not set
1836            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
1837                dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
1838                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
1839            self.cert_send_b_frame(s_frame)
1840
1841            cert_acl_data_asserts.assert_none(timeout=timedelta(seconds=0.5))
1842            # Send SREJ with F set
1843            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
1844                dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
1845                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
1846                0)
1847            self.cert_send_b_frame(s_frame)
1848
1849            cert_acl_data_asserts.assert_event_occurs(
1850                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
1851            )
1852
1853    def test_handle_receipt_rej_and_rr_with_f_set(self):
1854        """
1855        L2CAP/ERM/BI-04-C [Handle Receipt of S-Frame [REJ] and S-Frame [RR, F=1] that Both Require Retransmission of the Same I-Frames]
1856        Verify the IUT will only retransmit the requested I-frames once after receiving an S-frame [REJ]
1857        followed by an S-frame [RR] with the Final bit set that indicates the same I-frames should be
1858        retransmitted.
1859        """
1860        cert_acl_handle = self._setup_link_from_cert()
1861        with EventCallbackStream(
1862                self.cert_device.hci_acl_manager.FetchAclData(
1863                    empty_proto.Empty())) as cert_acl_data_stream:
1864            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1865            cert_acl_data_stream.register_callback(self._handle_control_packet)
1866            self.on_connection_response = self._on_connection_response_use_ertm
1867
1868            psm = 0x33
1869            scid = 0x41
1870            self._open_channel(
1871                cert_acl_data_stream,
1872                1,
1873                cert_acl_handle,
1874                scid,
1875                psm,
1876                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1877
1878            # FIXME: Order shouldn't matter here
1879            cert_acl_data_asserts.assert_event_occurs(
1880                self.is_correct_configuration_response)
1881            cert_acl_data_asserts.assert_event_occurs(
1882                self.is_correct_configuration_request)
1883
1884            dcid = self.scid_to_dcid[scid]
1885
1886            self.device_under_test.l2cap.SendDynamicChannelPacket(
1887                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1888            self.device_under_test.l2cap.SendDynamicChannelPacket(
1889                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1890            cert_acl_data_asserts.assert_event_occurs(
1891                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
1892                timeout=timedelta(0.5))
1893            cert_acl_data_asserts.assert_event_occurs(
1894                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
1895                timeout=timedelta(0.5))
1896            cert_acl_data_asserts.assert_event_occurs(
1897                lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL,
1898                timeout=timedelta(2))
1899
1900            # Send REJ with F not set
1901            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
1902                dcid, l2cap_packets.SupervisoryFunction.REJECT,
1903                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
1904            self.cert_send_b_frame(s_frame)
1905
1906            cert_acl_data_asserts.assert_none(timeout=timedelta(seconds=0.5))
1907
1908            # Send RR with F set
1909            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
1910                dcid, l2cap_packets.SupervisoryFunction.REJECT,
1911                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
1912                0)
1913            self.cert_send_b_frame(s_frame)
1914
1915            cert_acl_data_asserts.assert_event_occurs(
1916                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
1917            )
1918            cert_acl_data_asserts.assert_event_occurs(
1919                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1
1920            )
1921
1922    def test_handle_rej_and_i_frame_with_f_set(self):
1923        """
1924        L2CAP/ERM/BI-05-C [Handle receipt of S-Frame [REJ] and I-Frame [F=1] that Both Require Retransmission of the Same I-Frames]
1925        Verify the IUT will only retransmit the requested I-frames once after receiving an S-frame [REJ]
1926        followed by an I-frame with the Final bit set that indicates the same I-frames should be retransmitted.
1927        """
1928        cert_acl_handle = self._setup_link_from_cert()
1929        with EventCallbackStream(
1930                self.cert_device.hci_acl_manager.FetchAclData(
1931                    empty_proto.Empty())) as cert_acl_data_stream:
1932            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
1933            cert_acl_data_stream.register_callback(self._handle_control_packet)
1934            self.on_connection_response = self._on_connection_response_use_ertm
1935
1936            psm = 0x33
1937            scid = 0x41
1938            self._open_channel(
1939                cert_acl_data_stream,
1940                1,
1941                cert_acl_handle,
1942                scid,
1943                psm,
1944                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
1945
1946            # FIXME: Order shouldn't matter here
1947            cert_acl_data_asserts.assert_event_occurs(
1948                self.is_correct_configuration_response)
1949            cert_acl_data_asserts.assert_event_occurs(
1950                self.is_correct_configuration_request)
1951
1952            dcid = self.scid_to_dcid[scid]
1953
1954            self.device_under_test.l2cap.SendDynamicChannelPacket(
1955                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1956            self.device_under_test.l2cap.SendDynamicChannelPacket(
1957                l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
1958            cert_acl_data_asserts.assert_event_occurs(
1959                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
1960                timeout=timedelta(0.5))
1961            cert_acl_data_asserts.assert_event_occurs(
1962                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
1963                timeout=timedelta(0.5))
1964            cert_acl_data_asserts.assert_event_occurs(
1965                lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL,
1966                timeout=timedelta(2))
1967
1968            # Send SREJ with F not set
1969            s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
1970                dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
1971                l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
1972            self.cert_send_b_frame(s_frame)
1973
1974            cert_acl_data_asserts.assert_none(timeout=timedelta(seconds=0.5))
1975
1976            i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
1977                dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0,
1978                l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
1979                SAMPLE_PACKET)
1980            self.cert_send_b_frame(i_frame)
1981
1982            cert_acl_data_asserts.assert_event_occurs(
1983                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
1984            )
1985            cert_acl_data_asserts.assert_event_occurs(
1986                lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1
1987            )
1988
1989    def test_initiated_configurtion_request_ertm(self):
1990        """
1991        L2CAP/CMC/BV-01-C [IUT Initiated Configuration of Enhanced Retransmission Mode]
1992        Verify the IUT can send a Configuration Request command containing the F&EC option that specifies
1993        Enhanced Retransmission Mode.
1994        """
1995        cert_acl_handle = self._setup_link_from_cert()
1996        with EventCallbackStream(
1997                self.cert_device.hci_acl_manager.FetchAclData(
1998                    empty_proto.Empty())) as cert_acl_data_stream:
1999            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
2000            cert_acl_data_stream.register_callback(self._handle_control_packet)
2001            self.on_connection_response = self._on_connection_response_use_ertm
2002
2003            psm = 0x33
2004            scid = 0x41
2005            self._open_channel(
2006                cert_acl_data_stream,
2007                1,
2008                cert_acl_handle,
2009                scid,
2010                psm,
2011                mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
2012
2013            # TODO: Fix this test. It doesn't work so far with PDL struct
2014
2015            cert_acl_data_asserts.assert_event_occurs(
2016                self.is_correct_configuration_request)
2017            asserts.skip("Struct not working")
2018
2019    def test_respond_configuration_request_ertm(self):
2020        """
2021        L2CAP/CMC/BV-02-C [Lower Tester Initiated Configuration of Enhanced Retransmission Mode]
2022        Verify the IUT can accept a Configuration Request from the Lower Tester containing an F&EC option
2023        that specifies Enhanced Retransmission Mode.
2024        """
2025        cert_acl_handle = self._setup_link_from_cert()
2026        with EventCallbackStream(
2027                self.cert_device.hci_acl_manager.FetchAclData(
2028                    empty_proto.Empty())) as cert_acl_data_stream:
2029            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
2030            cert_acl_data_stream.register_callback(self._handle_control_packet)
2031            psm = 1
2032            scid = 0x0101
2033            self.retransmission_mode = l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM
2034            self.device_under_test.l2cap.SetDynamicChannel(
2035                l2cap_facade_pb2.SetEnableDynamicChannelRequest(
2036                    psm=psm, retransmission_mode=self.retransmission_mode))
2037
2038            open_channel = l2cap_packets.ConnectionRequestBuilder(1, psm, scid)
2039            open_channel_l2cap = l2cap_packets.BasicFrameBuilder(
2040                1, open_channel)
2041            self.cert_send_b_frame(open_channel_l2cap)
2042
2043            # TODO: Verify that the type should be ERTM
2044            cert_acl_data_asserts.assert_event_occurs(
2045                self.is_correct_configuration_response)
2046