1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from google.protobuf import empty_pb2 as empty_proto 18 19from blueberry.facade.l2cap.classic import facade_pb2 as l2cap_facade_pb2 20from blueberry.facade.l2cap.classic.facade_pb2 import LinkSecurityInterfaceCallbackEventType 21from blueberry.facade.l2cap.le import facade_pb2 as l2cap_le_facade_pb2 22from blueberry.facade.l2cap.le.facade_pb2 import SecurityLevel 23from bluetooth_packets_python3 import l2cap_packets 24from blueberry.tests.gd.cert.event_stream import FilteringEventStream 25from blueberry.tests.gd.cert.event_stream import EventStream, IEventStream 26from blueberry.tests.gd.cert.closable import Closable, safeClose 27from blueberry.tests.gd.cert.py_hci import PyHci 28from blueberry.tests.gd.cert.matchers import HciMatchers 29from blueberry.tests.gd.cert.matchers import L2capMatchers 30from blueberry.tests.gd.cert.truth import assertThat 31import hci_packets as hci 32 33 34class PyL2capChannel(IEventStream): 35 36 def __init__(self, device, psm, l2cap_stream): 37 self._device = device 38 self._psm = psm 39 self._le_l2cap_stream = l2cap_stream 40 self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream, 41 L2capMatchers.PacketPayloadWithMatchingPsm(self._psm)) 42 43 def get_event_queue(self): 44 return self._our_le_l2cap_view.get_event_queue() 45 46 def send(self, payload): 47 self._device.l2cap.SendDynamicChannelPacket( 48 l2cap_facade_pb2.DynamicChannelPacket(psm=self._psm, payload=payload)) 49 50 def close_channel(self): 51 self._device.l2cap.CloseChannel(l2cap_facade_pb2.CloseChannelRequest(psm=self._psm)) 52 53 def set_traffic_paused(self, paused): 54 self._device.l2cap.SetTrafficPaused(l2cap_facade_pb2.SetTrafficPausedRequest(psm=self._psm, paused=paused)) 55 56 57class _ClassicConnectionResponseFutureWrapper(object): 58 """ 59 The future object returned when we send a connection request from DUT. Can be used to get connection status and 60 create the corresponding PyL2capDynamicChannel object later 61 """ 62 63 def __init__(self, grpc_response_future, device, psm, l2cap_stream): 64 self._grpc_response_future = grpc_response_future 65 self._device = device 66 self._psm = psm 67 self._l2cap_stream = l2cap_stream 68 69 def get_channel(self): 70 return PyL2capChannel(self._device, self._psm, self._l2cap_stream) 71 72 73class PyL2cap(Closable): 74 75 def __init__(self, device, cert_address, has_security=False): 76 self._device = device 77 self._cert_address = cert_address 78 self._hci = PyHci(device) 79 self._l2cap_stream = EventStream(self._device.l2cap.FetchL2capData(empty_proto.Empty())) 80 self._security_connection_event_stream = EventStream( 81 self._device.l2cap.FetchSecurityConnectionEvents(empty_proto.Empty())) 82 if has_security == False: 83 self._hci.register_for_events(hci.EventCode.LINK_KEY_REQUEST) 84 85 def close(self): 86 safeClose(self._l2cap_stream) 87 safeClose(self._security_connection_event_stream) 88 safeClose(self._hci) 89 90 def register_dynamic_channel(self, psm=0x33, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC): 91 self._device.l2cap.SetDynamicChannel( 92 l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, retransmission_mode=mode)) 93 return PyL2capChannel(self._device, psm, self._l2cap_stream) 94 95 def connect_dynamic_channel_to_cert(self, psm=0x33, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC): 96 """ 97 Send open Dynamic channel request to CERT. 98 Get a future for connection result, to be used after CERT accepts request 99 """ 100 self.register_dynamic_channel(psm, mode) 101 response_future = self._device.l2cap.OpenChannel.future( 102 l2cap_facade_pb2.OpenChannelRequest(psm=psm, remote=self._cert_address, mode=mode)) 103 104 return _ClassicConnectionResponseFutureWrapper(response_future, self._device, psm, self._l2cap_stream) 105 106 def get_channel_queue_buffer_size(self): 107 return self._device.l2cap.GetChannelQueueDepth(empty_proto.Empty()).size 108 109 def initiate_connection_for_security(self): 110 """ 111 Establish an ACL for the specific purpose of pairing devices 112 """ 113 self._device.l2cap.InitiateConnectionForSecurity(self._cert_address) 114 115 def get_security_connection_event_stream(self): 116 """ 117 Stream of Link related events. Events are returned with an address. 118 Events map to the LinkSecurityInterfaceListener callbacks 119 """ 120 return self._security_connection_event_stream 121 122 def security_link_hold(self): 123 """ 124 Holds open the ACL indefinitely allowing for the security handshake 125 to take place 126 """ 127 self._device.l2cap.SecurityLinkHold(self._cert_address) 128 129 def security_link_ensure_authenticated(self): 130 """ 131 Triggers authentication process by sending HCI event AUTHENTICATION_REQUESTED 132 """ 133 self._device.l2cap.SecurityLinkEnsureAuthenticated(self._cert_address) 134 135 def security_link_release(self): 136 """ 137 Releases a Held open ACL allowing for the ACL to time out after the default time 138 """ 139 self._device.l2cap.SecurityLinkRelease(self._cert_address) 140 141 def security_link_disconnect(self): 142 """ 143 Immediately release and disconnect ACL 144 """ 145 self._device.l2cap.SecurityLinkDisconnect(self._cert_address) 146 147 def verify_security_connection(self): 148 """ 149 Verify that we get a connection and a link key request 150 """ 151 assertThat(self.get_security_connection_event_stream()).emits( 152 lambda event: event.event_type == LinkSecurityInterfaceCallbackEventType.ON_CONNECTED) 153 assertThat(self._hci.get_event_stream()).emits(HciMatchers.LinkKeyRequest()) 154 155 156class PyLeL2capFixedChannel(IEventStream): 157 158 def __init__(self, device, cid, l2cap_stream): 159 self._device = device 160 self._cid = cid 161 self._le_l2cap_stream = l2cap_stream 162 self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream, 163 L2capMatchers.PacketPayloadWithMatchingCid(self._cid)) 164 165 def get_event_queue(self): 166 return self._our_le_l2cap_view.get_event_queue() 167 168 def send(self, payload): 169 self._device.l2cap_le.SendFixedChannelPacket( 170 l2cap_le_facade_pb2.FixedChannelPacket(cid=self._cid, payload=payload)) 171 172 def close_channel(self): 173 self._device.l2cap_le.SetFixedChannel( 174 l2cap_le_facade_pb2.SetEnableFixedChannelRequest(cid=self._cid, enable=False)) 175 176 177class PyLeL2capDynamicChannel(IEventStream): 178 179 def __init__(self, device, cert_address, psm, l2cap_stream): 180 self._device = device 181 self._cert_address = cert_address 182 self._psm = psm 183 self._le_l2cap_stream = l2cap_stream 184 self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream, 185 L2capMatchers.PacketPayloadWithMatchingPsm(self._psm)) 186 187 def get_event_queue(self): 188 return self._our_le_l2cap_view.get_event_queue() 189 190 def send(self, payload): 191 self._device.l2cap_le.SendDynamicChannelPacket( 192 l2cap_le_facade_pb2.DynamicChannelPacket(psm=self._psm, payload=payload)) 193 194 def close_channel(self): 195 self._device.l2cap_le.CloseDynamicChannel( 196 l2cap_le_facade_pb2.CloseDynamicChannelRequest(remote=self._cert_address, psm=self._psm)) 197 198 199class _CreditBasedConnectionResponseFutureWrapper(object): 200 """ 201 The future object returned when we send a connection request from DUT. Can be used to get connection status and 202 create the corresponding PyLeL2capDynamicChannel object later 203 """ 204 205 def __init__(self, grpc_response_future, device, cert_address, psm, le_l2cap_stream): 206 self._grpc_response_future = grpc_response_future 207 self._device = device 208 self._cert_address = cert_address 209 self._psm = psm 210 self._le_l2cap_stream = le_l2cap_stream 211 212 def get_status(self): 213 return l2cap_packets.LeCreditBasedConnectionResponseResult(self._grpc_response_future.result().status) 214 215 def get_channel(self): 216 assertThat(self.get_status()).isEqualTo(l2cap_packets.LeCreditBasedConnectionResponseResult.SUCCESS) 217 return PyLeL2capDynamicChannel(self._device, self._cert_address, self._psm, self._le_l2cap_stream) 218 219 220class PyLeL2cap(Closable): 221 222 def __init__(self, device): 223 self._device = device 224 self._le_l2cap_stream = EventStream(self._device.l2cap_le.FetchL2capData(empty_proto.Empty())) 225 226 def close(self): 227 safeClose(self._le_l2cap_stream) 228 229 def enable_fixed_channel(self, cid=4): 230 self._device.l2cap_le.SetFixedChannel(l2cap_le_facade_pb2.SetEnableFixedChannelRequest(cid=cid, enable=True)) 231 232 def get_fixed_channel(self, cid=4): 233 return PyLeL2capFixedChannel(self._device, cid, self._le_l2cap_stream) 234 235 def register_coc(self, cert_address, psm=0x33, security_level=SecurityLevel.NO_SECURITY): 236 self._device.l2cap_le.SetDynamicChannel( 237 l2cap_le_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, enable=True, security_level=security_level)) 238 return PyLeL2capDynamicChannel(self._device, cert_address, psm, self._le_l2cap_stream) 239 240 def connect_coc_to_cert(self, cert_address, psm=0x33): 241 """ 242 Send open LE COC request to CERT. Get a future for connection result, to be used after CERT accepts request 243 """ 244 self.register_coc(cert_address, psm) 245 response_future = self._device.l2cap_le.OpenDynamicChannel.future( 246 l2cap_le_facade_pb2.OpenDynamicChannelRequest(psm=psm, remote=cert_address)) 247 248 return _CreditBasedConnectionResponseFutureWrapper(response_future, self._device, cert_address, psm, 249 self._le_l2cap_stream) 250 251 def update_connection_parameter(self, 252 conn_interval_min=0x10, 253 conn_interval_max=0x10, 254 conn_latency=0x0a, 255 supervision_timeout=0x64, 256 min_ce_length=12, 257 max_ce_length=12): 258 self._device.l2cap_le.SendConnectionParameterUpdate( 259 l2cap_le_facade_pb2.ConnectionParameter(conn_interval_min=conn_interval_min, 260 conn_interval_max=conn_interval_max, 261 conn_latency=conn_latency, 262 supervision_timeout=supervision_timeout, 263 min_ce_length=min_ce_length, 264 max_ce_length=max_ce_length)) 265