• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from google.protobuf import empty_pb2 as empty_proto
18
19from blueberry.facade.l2cap.classic import facade_pb2 as l2cap_facade_pb2
20from blueberry.facade.l2cap.classic.facade_pb2 import LinkSecurityInterfaceCallbackEventType
21from blueberry.facade.l2cap.le import facade_pb2 as l2cap_le_facade_pb2
22from blueberry.facade.l2cap.le.facade_pb2 import SecurityLevel
23from bluetooth_packets_python3 import l2cap_packets
24from blueberry.tests.gd.cert.event_stream import FilteringEventStream
25from blueberry.tests.gd.cert.event_stream import EventStream, IEventStream
26from blueberry.tests.gd.cert.closable import Closable, safeClose
27from blueberry.tests.gd.cert.py_hci import PyHci
28from blueberry.tests.gd.cert.matchers import HciMatchers
29from blueberry.tests.gd.cert.matchers import L2capMatchers
30from blueberry.tests.gd.cert.truth import assertThat
31import hci_packets as hci
32
33
34class PyL2capChannel(IEventStream):
35
36    def __init__(self, device, psm, l2cap_stream):
37        self._device = device
38        self._psm = psm
39        self._le_l2cap_stream = l2cap_stream
40        self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream,
41                                                       L2capMatchers.PacketPayloadWithMatchingPsm(self._psm))
42
43    def get_event_queue(self):
44        return self._our_le_l2cap_view.get_event_queue()
45
46    def send(self, payload):
47        self._device.l2cap.SendDynamicChannelPacket(
48            l2cap_facade_pb2.DynamicChannelPacket(psm=self._psm, payload=payload))
49
50    def close_channel(self):
51        self._device.l2cap.CloseChannel(l2cap_facade_pb2.CloseChannelRequest(psm=self._psm))
52
53    def set_traffic_paused(self, paused):
54        self._device.l2cap.SetTrafficPaused(l2cap_facade_pb2.SetTrafficPausedRequest(psm=self._psm, paused=paused))
55
56
57class _ClassicConnectionResponseFutureWrapper(object):
58    """
59    The future object returned when we send a connection request from DUT. Can be used to get connection status and
60    create the corresponding PyL2capDynamicChannel object later
61    """
62
63    def __init__(self, grpc_response_future, device, psm, l2cap_stream):
64        self._grpc_response_future = grpc_response_future
65        self._device = device
66        self._psm = psm
67        self._l2cap_stream = l2cap_stream
68
69    def get_channel(self):
70        return PyL2capChannel(self._device, self._psm, self._l2cap_stream)
71
72
73class PyL2cap(Closable):
74
75    def __init__(self, device, cert_address, has_security=False):
76        self._device = device
77        self._cert_address = cert_address
78        self._hci = PyHci(device)
79        self._l2cap_stream = EventStream(self._device.l2cap.FetchL2capData(empty_proto.Empty()))
80        self._security_connection_event_stream = EventStream(
81            self._device.l2cap.FetchSecurityConnectionEvents(empty_proto.Empty()))
82        if has_security == False:
83            self._hci.register_for_events(hci.EventCode.LINK_KEY_REQUEST)
84
85    def close(self):
86        safeClose(self._l2cap_stream)
87        safeClose(self._security_connection_event_stream)
88        safeClose(self._hci)
89
90    def register_dynamic_channel(self, psm=0x33, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC):
91        self._device.l2cap.SetDynamicChannel(
92            l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, retransmission_mode=mode))
93        return PyL2capChannel(self._device, psm, self._l2cap_stream)
94
95    def connect_dynamic_channel_to_cert(self, psm=0x33, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC):
96        """
97        Send open Dynamic channel request to CERT.
98        Get a future for connection result, to be used after CERT accepts request
99        """
100        self.register_dynamic_channel(psm, mode)
101        response_future = self._device.l2cap.OpenChannel.future(
102            l2cap_facade_pb2.OpenChannelRequest(psm=psm, remote=self._cert_address, mode=mode))
103
104        return _ClassicConnectionResponseFutureWrapper(response_future, self._device, psm, self._l2cap_stream)
105
106    def get_channel_queue_buffer_size(self):
107        return self._device.l2cap.GetChannelQueueDepth(empty_proto.Empty()).size
108
109    def initiate_connection_for_security(self):
110        """
111        Establish an ACL for the specific purpose of pairing devices
112        """
113        self._device.l2cap.InitiateConnectionForSecurity(self._cert_address)
114
115    def get_security_connection_event_stream(self):
116        """
117        Stream of Link related events.  Events are returned with an address.
118        Events map to the LinkSecurityInterfaceListener callbacks
119        """
120        return self._security_connection_event_stream
121
122    def security_link_hold(self):
123        """
124        Holds open the ACL indefinitely allowing for the security handshake
125        to take place
126        """
127        self._device.l2cap.SecurityLinkHold(self._cert_address)
128
129    def security_link_ensure_authenticated(self):
130        """
131        Triggers authentication process by sending HCI event AUTHENTICATION_REQUESTED
132        """
133        self._device.l2cap.SecurityLinkEnsureAuthenticated(self._cert_address)
134
135    def security_link_release(self):
136        """
137        Releases a Held open ACL allowing for the ACL to time out after the default time
138        """
139        self._device.l2cap.SecurityLinkRelease(self._cert_address)
140
141    def security_link_disconnect(self):
142        """
143        Immediately release and disconnect ACL
144        """
145        self._device.l2cap.SecurityLinkDisconnect(self._cert_address)
146
147    def verify_security_connection(self):
148        """
149        Verify that we get a connection and a link key request
150        """
151        assertThat(self.get_security_connection_event_stream()).emits(
152            lambda event: event.event_type == LinkSecurityInterfaceCallbackEventType.ON_CONNECTED)
153        assertThat(self._hci.get_event_stream()).emits(HciMatchers.LinkKeyRequest())
154
155
156class PyLeL2capFixedChannel(IEventStream):
157
158    def __init__(self, device, cid, l2cap_stream):
159        self._device = device
160        self._cid = cid
161        self._le_l2cap_stream = l2cap_stream
162        self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream,
163                                                       L2capMatchers.PacketPayloadWithMatchingCid(self._cid))
164
165    def get_event_queue(self):
166        return self._our_le_l2cap_view.get_event_queue()
167
168    def send(self, payload):
169        self._device.l2cap_le.SendFixedChannelPacket(
170            l2cap_le_facade_pb2.FixedChannelPacket(cid=self._cid, payload=payload))
171
172    def close_channel(self):
173        self._device.l2cap_le.SetFixedChannel(
174            l2cap_le_facade_pb2.SetEnableFixedChannelRequest(cid=self._cid, enable=False))
175
176
177class PyLeL2capDynamicChannel(IEventStream):
178
179    def __init__(self, device, cert_address, psm, l2cap_stream):
180        self._device = device
181        self._cert_address = cert_address
182        self._psm = psm
183        self._le_l2cap_stream = l2cap_stream
184        self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream,
185                                                       L2capMatchers.PacketPayloadWithMatchingPsm(self._psm))
186
187    def get_event_queue(self):
188        return self._our_le_l2cap_view.get_event_queue()
189
190    def send(self, payload):
191        self._device.l2cap_le.SendDynamicChannelPacket(
192            l2cap_le_facade_pb2.DynamicChannelPacket(psm=self._psm, payload=payload))
193
194    def close_channel(self):
195        self._device.l2cap_le.CloseDynamicChannel(
196            l2cap_le_facade_pb2.CloseDynamicChannelRequest(remote=self._cert_address, psm=self._psm))
197
198
199class _CreditBasedConnectionResponseFutureWrapper(object):
200    """
201    The future object returned when we send a connection request from DUT. Can be used to get connection status and
202    create the corresponding PyLeL2capDynamicChannel object later
203    """
204
205    def __init__(self, grpc_response_future, device, cert_address, psm, le_l2cap_stream):
206        self._grpc_response_future = grpc_response_future
207        self._device = device
208        self._cert_address = cert_address
209        self._psm = psm
210        self._le_l2cap_stream = le_l2cap_stream
211
212    def get_status(self):
213        return l2cap_packets.LeCreditBasedConnectionResponseResult(self._grpc_response_future.result().status)
214
215    def get_channel(self):
216        assertThat(self.get_status()).isEqualTo(l2cap_packets.LeCreditBasedConnectionResponseResult.SUCCESS)
217        return PyLeL2capDynamicChannel(self._device, self._cert_address, self._psm, self._le_l2cap_stream)
218
219
220class PyLeL2cap(Closable):
221
222    def __init__(self, device):
223        self._device = device
224        self._le_l2cap_stream = EventStream(self._device.l2cap_le.FetchL2capData(empty_proto.Empty()))
225
226    def close(self):
227        safeClose(self._le_l2cap_stream)
228
229    def enable_fixed_channel(self, cid=4):
230        self._device.l2cap_le.SetFixedChannel(l2cap_le_facade_pb2.SetEnableFixedChannelRequest(cid=cid, enable=True))
231
232    def get_fixed_channel(self, cid=4):
233        return PyLeL2capFixedChannel(self._device, cid, self._le_l2cap_stream)
234
235    def register_coc(self, cert_address, psm=0x33, security_level=SecurityLevel.NO_SECURITY):
236        self._device.l2cap_le.SetDynamicChannel(
237            l2cap_le_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, enable=True, security_level=security_level))
238        return PyLeL2capDynamicChannel(self._device, cert_address, psm, self._le_l2cap_stream)
239
240    def connect_coc_to_cert(self, cert_address, psm=0x33):
241        """
242        Send open LE COC request to CERT. Get a future for connection result, to be used after CERT accepts request
243        """
244        self.register_coc(cert_address, psm)
245        response_future = self._device.l2cap_le.OpenDynamicChannel.future(
246            l2cap_le_facade_pb2.OpenDynamicChannelRequest(psm=psm, remote=cert_address))
247
248        return _CreditBasedConnectionResponseFutureWrapper(response_future, self._device, cert_address, psm,
249                                                           self._le_l2cap_stream)
250
251    def update_connection_parameter(self,
252                                    conn_interval_min=0x10,
253                                    conn_interval_max=0x10,
254                                    conn_latency=0x0a,
255                                    supervision_timeout=0x64,
256                                    min_ce_length=12,
257                                    max_ce_length=12):
258        self._device.l2cap_le.SendConnectionParameterUpdate(
259            l2cap_le_facade_pb2.ConnectionParameter(conn_interval_min=conn_interval_min,
260                                                    conn_interval_max=conn_interval_max,
261                                                    conn_latency=conn_latency,
262                                                    supervision_timeout=supervision_timeout,
263                                                    min_ce_length=min_ce_length,
264                                                    max_ce_length=max_ce_length))
265