1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from blueberry.tests.gd.cert.captures import L2capCaptures 18from blueberry.tests.gd.cert.closable import Closable 19from blueberry.tests.gd.cert.closable import safeClose 20from blueberry.tests.gd.cert.event_stream import FilteringEventStream 21from blueberry.tests.gd.cert.event_stream import IEventStream 22from blueberry.tests.gd.cert.matchers import L2capMatchers 23from blueberry.tests.gd.cert.py_le_acl_manager import PyLeAclManager 24from blueberry.tests.gd.cert.truth import assertThat 25import bluetooth_packets_python3 as bt_packets 26from bluetooth_packets_python3 import l2cap_packets 27from bluetooth_packets_python3.l2cap_packets import LeCommandCode 28from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult 29 30 31class CertLeL2capChannel(IEventStream): 32 33 def __init__(self, device, scid, dcid, acl_stream, acl, control_channel, initial_credits=0): 34 self._device = device 35 self._scid = scid 36 self._dcid = dcid 37 self._acl_stream = acl_stream 38 self._acl = acl 39 self._control_channel = control_channel 40 self._our_acl_view = FilteringEventStream(acl_stream, L2capMatchers.ExtractBasicFrame(scid)) 41 self._credits_left = initial_credits 42 43 def get_event_queue(self): 44 return self._our_acl_view.get_event_queue() 45 46 def send(self, packet): 47 frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet) 48 self._acl.send(frame.Serialize()) 49 self._credits_left -= 1 50 51 def send_first_le_i_frame(self, sdu_size, packet): 52 frame = l2cap_packets.FirstLeInformationFrameBuilder(self._dcid, sdu_size, packet) 53 self._acl.send(frame.Serialize()) 54 self._credits_left -= 1 55 56 def disconnect_and_verify(self): 57 assertThat(self._scid).isNotEqualTo(1) 58 self._control_channel.send(l2cap_packets.LeDisconnectionRequestBuilder(1, self._dcid, self._scid)) 59 60 assertThat(self._control_channel).emits(L2capMatchers.LeDisconnectionResponse(self._scid, self._dcid)) 61 62 def verify_disconnect_request(self): 63 assertThat(self._control_channel).emits(L2capMatchers.LeDisconnectionRequest(self._dcid, self._scid)) 64 65 def send_credits(self, num_credits): 66 self._control_channel.send(l2cap_packets.LeFlowControlCreditBuilder(2, self._scid, num_credits)) 67 68 def credits_left(self): 69 return self._credits_left 70 71 72class CertLeL2cap(Closable): 73 74 def __init__(self, device): 75 self._device = device 76 self._le_acl_manager = PyLeAclManager(device) 77 self._le_acl = None 78 79 self.control_table = { 80 LeCommandCode.DISCONNECTION_REQUEST: self._on_disconnection_request_default, 81 LeCommandCode.DISCONNECTION_RESPONSE: self._on_disconnection_response_default, 82 LeCommandCode.LE_FLOW_CONTROL_CREDIT: self._on_credit, 83 } 84 85 self._cid_to_cert_channels = {} 86 87 def close(self): 88 self._le_acl_manager.close() 89 safeClose(self._le_acl) 90 91 def connect_le_acl(self, remote_addr): 92 self._le_acl = self._le_acl_manager.connect_to_remote(remote_addr) 93 self.control_channel = CertLeL2capChannel( 94 self._device, 5, 5, self._get_acl_stream(), self._le_acl, control_channel=None) 95 self._get_acl_stream().register_callback(self._handle_control_packet) 96 97 def wait_for_connection(self): 98 self._le_acl = self._le_acl_manager.wait_for_connection() 99 self.control_channel = CertLeL2capChannel( 100 self._device, 5, 5, self._get_acl_stream(), self._le_acl, control_channel=None) 101 self._get_acl_stream().register_callback(self._handle_control_packet) 102 103 def open_fixed_channel(self, cid=4): 104 channel = CertLeL2capChannel(self._device, cid, cid, self._get_acl_stream(), self._le_acl, None, 0) 105 return channel 106 107 def open_channel(self, signal_id, psm, scid, mtu=1000, mps=100, initial_credit=6): 108 self.control_channel.send( 109 l2cap_packets.LeCreditBasedConnectionRequestBuilder(signal_id, psm, scid, mtu, mps, initial_credit)) 110 111 response = L2capCaptures.CreditBasedConnectionResponse() 112 assertThat(self.control_channel).emits(response) 113 channel = CertLeL2capChannel(self._device, scid, 114 response.get().GetDestinationCid(), self._get_acl_stream(), self._le_acl, 115 self.control_channel, 116 response.get().GetInitialCredits()) 117 self._cid_to_cert_channels[scid] = channel 118 return channel 119 120 def open_channel_with_expected_result(self, psm=0x33, result=LeCreditBasedConnectionResponseResult.SUCCESS): 121 self.control_channel.send(l2cap_packets.LeCreditBasedConnectionRequestBuilder(1, psm, 0x40, 1000, 100, 6)) 122 123 response = L2capMatchers.CreditBasedConnectionResponse(result) 124 assertThat(self.control_channel).emits(response) 125 126 def verify_and_respond_open_channel_from_remote(self, 127 psm=0x33, 128 result=LeCreditBasedConnectionResponseResult.SUCCESS, 129 our_scid=None): 130 request = L2capCaptures.CreditBasedConnectionRequest(psm) 131 assertThat(self.control_channel).emits(request) 132 (scid, dcid) = self._respond_connection_request_default(request.get(), result, our_scid) 133 channel = CertLeL2capChannel(self._device, scid, dcid, self._get_acl_stream(), self._le_acl, 134 self.control_channel, 135 request.get().GetInitialCredits()) 136 self._cid_to_cert_channels[scid] = channel 137 return channel 138 139 def verify_and_reject_open_channel_from_remote(self, psm=0x33): 140 request = L2capCaptures.CreditBasedConnectionRequest(psm) 141 assertThat(self.control_channel).emits(request) 142 sid = request.get().GetIdentifier() 143 reject = l2cap_packets.LeCommandRejectNotUnderstoodBuilder(sid) 144 self.control_channel.send(reject) 145 146 def verify_le_flow_control_credit(self, channel): 147 assertThat(self.control_channel).emits(L2capMatchers.LeFlowControlCredit(channel._dcid)) 148 149 def _respond_connection_request_default(self, 150 request, 151 result=LeCreditBasedConnectionResponseResult.SUCCESS, 152 our_scid=None): 153 sid = request.GetIdentifier() 154 their_scid = request.GetSourceCid() 155 mtu = request.GetMtu() 156 mps = request.GetMps() 157 initial_credits = request.GetInitialCredits() 158 # If our_scid is not specified, we use the same value - their scid as their scid 159 if our_scid is None: 160 our_scid = their_scid 161 our_dcid = their_scid 162 response = l2cap_packets.LeCreditBasedConnectionResponseBuilder(sid, our_scid, mtu, mps, initial_credits, 163 result) 164 self.control_channel.send(response) 165 return (our_scid, our_dcid) 166 167 def get_control_channel(self): 168 return self.control_channel 169 170 def _get_acl_stream(self): 171 return self._le_acl.acl_stream 172 173 def _on_disconnection_request_default(self, request): 174 disconnection_request = l2cap_packets.LeDisconnectionRequestView(request) 175 sid = disconnection_request.GetIdentifier() 176 scid = disconnection_request.GetSourceCid() 177 dcid = disconnection_request.GetDestinationCid() 178 response = l2cap_packets.LeDisconnectionResponseBuilder(sid, dcid, scid) 179 self.control_channel.send(response) 180 181 def _on_disconnection_response_default(self, request): 182 disconnection_response = l2cap_packets.LeDisconnectionResponseView(request) 183 184 def _on_credit(self, l2cap_le_control_view): 185 credit_view = l2cap_packets.LeFlowControlCreditView(l2cap_le_control_view) 186 cid = credit_view.GetCid() 187 if cid not in self._cid_to_cert_channels: 188 return 189 self._cid_to_cert_channels[cid]._credits_left += credit_view.GetCredits() 190 191 def _handle_control_packet(self, l2cap_packet): 192 packet_bytes = l2cap_packet.payload 193 l2cap_view = l2cap_packets.BasicFrameView(bt_packets.PacketViewLittleEndian(list(packet_bytes))) 194 if l2cap_view.GetChannelId() != 5: 195 return 196 request = l2cap_packets.LeControlView(l2cap_view.GetPayload()) 197 fn = self.control_table.get(request.GetCode()) 198 if fn is not None: 199 fn(request) 200 return 201