• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from blueberry.tests.gd.cert.captures import L2capCaptures
18from blueberry.tests.gd.cert.closable import Closable
19from blueberry.tests.gd.cert.closable import safeClose
20from blueberry.tests.gd.cert.event_stream import FilteringEventStream
21from blueberry.tests.gd.cert.event_stream import IEventStream
22from blueberry.tests.gd.cert.matchers import L2capMatchers
23from blueberry.tests.gd.cert.py_le_acl_manager import PyLeAclManager
24from blueberry.tests.gd.cert.truth import assertThat
25import bluetooth_packets_python3 as bt_packets
26from bluetooth_packets_python3 import l2cap_packets
27from bluetooth_packets_python3.l2cap_packets import LeCommandCode
28from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult
29
30
31class CertLeL2capChannel(IEventStream):
32
33    def __init__(self, device, scid, dcid, acl_stream, acl, control_channel, initial_credits=0):
34        self._device = device
35        self._scid = scid
36        self._dcid = dcid
37        self._acl_stream = acl_stream
38        self._acl = acl
39        self._control_channel = control_channel
40        self._our_acl_view = FilteringEventStream(acl_stream, L2capMatchers.ExtractBasicFrame(scid))
41        self._credits_left = initial_credits
42
43    def get_event_queue(self):
44        return self._our_acl_view.get_event_queue()
45
46    def send(self, packet):
47        frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet)
48        self._acl.send(frame.Serialize())
49        self._credits_left -= 1
50
51    def send_first_le_i_frame(self, sdu_size, packet):
52        frame = l2cap_packets.FirstLeInformationFrameBuilder(self._dcid, sdu_size, packet)
53        self._acl.send(frame.Serialize())
54        self._credits_left -= 1
55
56    def disconnect_and_verify(self):
57        assertThat(self._scid).isNotEqualTo(1)
58        self._control_channel.send(l2cap_packets.LeDisconnectionRequestBuilder(1, self._dcid, self._scid))
59
60        assertThat(self._control_channel).emits(L2capMatchers.LeDisconnectionResponse(self._scid, self._dcid))
61
62    def verify_disconnect_request(self):
63        assertThat(self._control_channel).emits(L2capMatchers.LeDisconnectionRequest(self._dcid, self._scid))
64
65    def send_credits(self, num_credits):
66        self._control_channel.send(l2cap_packets.LeFlowControlCreditBuilder(2, self._scid, num_credits))
67
68    def credits_left(self):
69        return self._credits_left
70
71
72class CertLeL2cap(Closable):
73
74    def __init__(self, device):
75        self._device = device
76        self._le_acl_manager = PyLeAclManager(device)
77        self._le_acl = None
78
79        self.control_table = {
80            LeCommandCode.DISCONNECTION_REQUEST: self._on_disconnection_request_default,
81            LeCommandCode.DISCONNECTION_RESPONSE: self._on_disconnection_response_default,
82            LeCommandCode.LE_FLOW_CONTROL_CREDIT: self._on_credit,
83        }
84
85        self._cid_to_cert_channels = {}
86
87    def close(self):
88        self._le_acl_manager.close()
89        safeClose(self._le_acl)
90
91    def connect_le_acl(self, remote_addr):
92        self._le_acl = self._le_acl_manager.connect_to_remote(remote_addr)
93        self.control_channel = CertLeL2capChannel(
94            self._device, 5, 5, self._get_acl_stream(), self._le_acl, control_channel=None)
95        self._get_acl_stream().register_callback(self._handle_control_packet)
96
97    def wait_for_connection(self):
98        self._le_acl = self._le_acl_manager.wait_for_connection()
99        self.control_channel = CertLeL2capChannel(
100            self._device, 5, 5, self._get_acl_stream(), self._le_acl, control_channel=None)
101        self._get_acl_stream().register_callback(self._handle_control_packet)
102
103    def open_fixed_channel(self, cid=4):
104        channel = CertLeL2capChannel(self._device, cid, cid, self._get_acl_stream(), self._le_acl, None, 0)
105        return channel
106
107    def open_channel(self, signal_id, psm, scid, mtu=1000, mps=100, initial_credit=6):
108        self.control_channel.send(
109            l2cap_packets.LeCreditBasedConnectionRequestBuilder(signal_id, psm, scid, mtu, mps, initial_credit))
110
111        response = L2capCaptures.CreditBasedConnectionResponse()
112        assertThat(self.control_channel).emits(response)
113        channel = CertLeL2capChannel(self._device, scid,
114                                     response.get().GetDestinationCid(), self._get_acl_stream(), self._le_acl,
115                                     self.control_channel,
116                                     response.get().GetInitialCredits())
117        self._cid_to_cert_channels[scid] = channel
118        return channel
119
120    def open_channel_with_expected_result(self, psm=0x33, result=LeCreditBasedConnectionResponseResult.SUCCESS):
121        self.control_channel.send(l2cap_packets.LeCreditBasedConnectionRequestBuilder(1, psm, 0x40, 1000, 100, 6))
122
123        response = L2capMatchers.CreditBasedConnectionResponse(result)
124        assertThat(self.control_channel).emits(response)
125
126    def verify_and_respond_open_channel_from_remote(self,
127                                                    psm=0x33,
128                                                    result=LeCreditBasedConnectionResponseResult.SUCCESS,
129                                                    our_scid=None):
130        request = L2capCaptures.CreditBasedConnectionRequest(psm)
131        assertThat(self.control_channel).emits(request)
132        (scid, dcid) = self._respond_connection_request_default(request.get(), result, our_scid)
133        channel = CertLeL2capChannel(self._device, scid, dcid, self._get_acl_stream(), self._le_acl,
134                                     self.control_channel,
135                                     request.get().GetInitialCredits())
136        self._cid_to_cert_channels[scid] = channel
137        return channel
138
139    def verify_and_reject_open_channel_from_remote(self, psm=0x33):
140        request = L2capCaptures.CreditBasedConnectionRequest(psm)
141        assertThat(self.control_channel).emits(request)
142        sid = request.get().GetIdentifier()
143        reject = l2cap_packets.LeCommandRejectNotUnderstoodBuilder(sid)
144        self.control_channel.send(reject)
145
146    def verify_le_flow_control_credit(self, channel):
147        assertThat(self.control_channel).emits(L2capMatchers.LeFlowControlCredit(channel._dcid))
148
149    def _respond_connection_request_default(self,
150                                            request,
151                                            result=LeCreditBasedConnectionResponseResult.SUCCESS,
152                                            our_scid=None):
153        sid = request.GetIdentifier()
154        their_scid = request.GetSourceCid()
155        mtu = request.GetMtu()
156        mps = request.GetMps()
157        initial_credits = request.GetInitialCredits()
158        # If our_scid is not specified, we use the same value - their scid as their scid
159        if our_scid is None:
160            our_scid = their_scid
161        our_dcid = their_scid
162        response = l2cap_packets.LeCreditBasedConnectionResponseBuilder(sid, our_scid, mtu, mps, initial_credits,
163                                                                        result)
164        self.control_channel.send(response)
165        return (our_scid, our_dcid)
166
167    def get_control_channel(self):
168        return self.control_channel
169
170    def _get_acl_stream(self):
171        return self._le_acl.acl_stream
172
173    def _on_disconnection_request_default(self, request):
174        disconnection_request = l2cap_packets.LeDisconnectionRequestView(request)
175        sid = disconnection_request.GetIdentifier()
176        scid = disconnection_request.GetSourceCid()
177        dcid = disconnection_request.GetDestinationCid()
178        response = l2cap_packets.LeDisconnectionResponseBuilder(sid, dcid, scid)
179        self.control_channel.send(response)
180
181    def _on_disconnection_response_default(self, request):
182        disconnection_response = l2cap_packets.LeDisconnectionResponseView(request)
183
184    def _on_credit(self, l2cap_le_control_view):
185        credit_view = l2cap_packets.LeFlowControlCreditView(l2cap_le_control_view)
186        cid = credit_view.GetCid()
187        if cid not in self._cid_to_cert_channels:
188            return
189        self._cid_to_cert_channels[cid]._credits_left += credit_view.GetCredits()
190
191    def _handle_control_packet(self, l2cap_packet):
192        packet_bytes = l2cap_packet.payload
193        l2cap_view = l2cap_packets.BasicFrameView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))
194        if l2cap_view.GetChannelId() != 5:
195            return
196        request = l2cap_packets.LeControlView(l2cap_view.GetPayload())
197        fn = self.control_table.get(request.GetCode())
198        if fn is not None:
199            fn(request)
200        return
201