1# 2# Copyright 2021 - The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16from blueberry.tests.gd.cert.matchers import IsoMatchers 17from blueberry.tests.gd.cert.metadata import metadata 18from blueberry.tests.gd.cert.py_l2cap import PyLeL2cap 19from blueberry.tests.gd.cert.py_le_iso import PyLeIso 20from blueberry.tests.gd.cert.py_le_iso import CisTestParameters 21from blueberry.tests.gd.cert.truth import assertThat 22from blueberry.tests.gd.cert import gd_base_test 23from blueberry.tests.gd.iso.cert_le_iso import CertLeIso 24from blueberry.tests.gd.l2cap.le.cert_le_l2cap import CertLeL2cap 25from blueberry.facade import common_pb2 as common 26from blueberry.facade.hci import controller_facade_pb2 as controller_facade 27from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade 28from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade 29from mobly import asserts 30from mobly import test_runner 31import hci_packets as hci 32 33 34class LeIsoTest(gd_base_test.GdBaseTestClass): 35 36 def setup_class(self): 37 gd_base_test.GdBaseTestClass.setup_class(self, dut_module='L2CAP', cert_module='HCI_INTERFACES') 38 39 def setup_test(self): 40 gd_base_test.GdBaseTestClass.setup_test(self) 41 42 self.dut_l2cap = PyLeL2cap(self.dut) 43 self.cert_l2cap = CertLeL2cap(self.cert) 44 self.dut_address = common.BluetoothAddressWithType( 45 address=common.BluetoothAddress(address=bytes(b'D0:05:04:03:02:01')), type=common.RANDOM_DEVICE_ADDRESS) 46 self.cert_address = common.BluetoothAddressWithType( 47 address=common.BluetoothAddress(address=bytes(b'C0:11:FF:AA:33:22')), type=common.RANDOM_DEVICE_ADDRESS) 48 dut_privacy_policy = le_initiator_address_facade.PrivacyPolicy( 49 address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, 50 address_with_type=self.dut_address, 51 rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 52 minimum_rotation_time=0, 53 maximum_rotation_time=0) 54 self.dut_l2cap._device.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(dut_privacy_policy) 55 privacy_policy = le_initiator_address_facade.PrivacyPolicy( 56 address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, 57 address_with_type=self.cert_address, 58 rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 59 minimum_rotation_time=0, 60 maximum_rotation_time=0) 61 self.cert_l2cap._device.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) 62 63 self.dut_iso = PyLeIso(self.dut) 64 self.cert_iso = CertLeIso(self.cert) 65 66 def teardown_test(self): 67 self.dut_iso.close() 68 self.cert_iso.close() 69 70 self.cert_l2cap.close() 71 self.dut_l2cap.close() 72 gd_base_test.GdBaseTestClass.teardown_test(self) 73 74 #cert becomes central of connection, dut peripheral 75 def _setup_link_from_cert(self): 76 # DUT Advertises 77 gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME, data=list(bytes(b'Im_The_DUT'))) 78 gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize()) 79 config = le_advertising_facade.AdvertisingConfig( 80 advertisement=[gap_data], 81 interval_min=512, 82 interval_max=768, 83 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 84 own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, 85 channel_map=7, 86 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 87 request = le_advertising_facade.CreateAdvertiserRequest(config=config) 88 create_response = self.dut.hci_le_advertising_manager.CreateAdvertiser(request) 89 self.cert_l2cap.connect_le_acl(self.dut_address) 90 91 def _setup_cis_from_cert(self, cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, peripherals_clock_accuracy, 92 packing, framing, max_transport_latency_m_to_s, max_transport_latency_s_to_m, cis_id, 93 max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m, bn_m_to_s, bn_s_to_m): 94 95 self.cert_iso.le_set_cig_parameters(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, 96 peripherals_clock_accuracy, packing, framing, max_transport_latency_m_to_s, 97 max_transport_latency_s_to_m, cis_id, max_sdu_m_to_s, max_sdu_s_to_m, 98 phy_m_to_s, phy_s_to_m, bn_m_to_s, bn_s_to_m) 99 100 cis_handles = self.cert_iso.wait_le_set_cig_parameters_complete() 101 102 cis_handle = cis_handles[0] 103 104 acl_connection_handle = self.cert_l2cap._le_acl.handle 105 self.cert_iso.le_cretate_cis([(cis_handle, acl_connection_handle)]) 106 dut_cis_stream = self.dut_iso.wait_le_cis_established() 107 cert_cis_stream = self.cert_iso.wait_le_cis_established() 108 return (dut_cis_stream, cert_cis_stream) 109 110 def skip_if_iso_not_supported(self): 111 supported = self.dut.hci_controller.IsSupportedCommand( 112 controller_facade.OpCodeMsg(op_code=int(hci.OpCode.LE_SET_CIG_PARAMETERS))) 113 if (not supported.supported): 114 asserts.skip("Skipping this test. The chip doesn't support LE ISO") 115 116 @metadata(pts_test_id="IAL/CIS/UNF/SLA/BV-01-C", 117 pts_test_name="connected isochronous stream, unframed data, peripheral role") 118 def test_iso_cis_unf_sla_bv_01_c(self): 119 self.skip_if_iso_not_supported() 120 """ 121 Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length. 122 """ 123 cig_id = 0x01 124 sdu_interval_m_to_s = 0 125 sdu_interval_s_to_m = 0x186a 126 peripherals_clock_accuracy = 0 127 packing = 0 128 framing = 0 129 max_transport_latency_m_to_s = 0 130 max_transport_latency_s_to_m = 0 131 cis_id = 0x01 132 max_sdu_m_to_s = 100 133 max_sdu_s_to_m = 100 134 phy_m_to_s = 0x02 135 phy_s_to_m = 0x00 136 bn_m_to_s = 0 137 bn_s_to_m = 2 138 139 self._setup_link_from_cert() 140 (dut_cis_stream, 141 cert_cis_stream) = self._setup_cis_from_cert(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, 142 peripherals_clock_accuracy, packing, framing, 143 max_transport_latency_m_to_s, max_transport_latency_s_to_m, 144 cis_id, max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m, 145 bn_m_to_s, bn_s_to_m) 146 dut_cis_stream.send(b'abcdefgh' * 10) 147 assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10)) 148 149 @metadata(pts_test_id="IAL/CIS/UNF/SLA/BV-25-C", 150 pts_test_name="connected isochronous stream, unframed data, peripheral role") 151 def test_iso_cis_unf_sla_bv_25_c(self): 152 self.skip_if_iso_not_supported() 153 """ 154 Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length. 155 """ 156 cig_id = 0x01 157 sdu_interval_m_to_s = 0x7530 158 sdu_interval_s_to_m = 0x7530 159 peripherals_clock_accuracy = 0 160 packing = 0 161 framing = 0 162 max_transport_latency_m_to_s = 0 163 max_transport_latency_s_to_m = 0 164 cis_id = 0x01 165 max_sdu_m_to_s = 100 166 max_sdu_s_to_m = 100 167 phy_m_to_s = 0x02 168 phy_s_to_m = 0x00 169 bn_m_to_s = 3 170 bn_s_to_m = 1 171 172 self._setup_link_from_cert() 173 (dut_cis_stream, 174 cert_cis_stream) = self._setup_cis_from_cert(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, 175 peripherals_clock_accuracy, packing, framing, 176 max_transport_latency_m_to_s, max_transport_latency_s_to_m, 177 cis_id, max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m, 178 bn_m_to_s, bn_s_to_m) 179 dut_cis_stream.send(b'abcdefgh' * 10) 180 assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10)) 181 182 @metadata(pts_test_id="IAL/CIS/FRA/SLA/BV-03-C", 183 pts_test_name="connected isochronous stream, framed data, peripheral role") 184 def test_iso_cis_fra_sla_bv_03_c(self): 185 self.skip_if_iso_not_supported() 186 """ 187 Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length. 188 """ 189 cig_id = 0x01 190 sdu_interval_m_to_s = 0x0000 191 sdu_interval_s_to_m = 0x4e30 192 peripherals_clock_accuracy = 0 193 packing = 0 194 framing = 1 195 max_transport_latency_m_to_s = 0 196 max_transport_latency_s_to_m = 0 197 cis_id = 0x01 198 max_sdu_m_to_s = 100 199 max_sdu_s_to_m = 100 200 phy_m_to_s = 0x02 201 phy_s_to_m = 0x00 202 bn_m_to_s = 0 203 bn_s_to_m = 2 204 205 self._setup_link_from_cert() 206 (dut_cis_stream, 207 cert_cis_stream) = self._setup_cis_from_cert(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, 208 peripherals_clock_accuracy, packing, framing, 209 max_transport_latency_m_to_s, max_transport_latency_s_to_m, 210 cis_id, max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m, 211 bn_m_to_s, bn_s_to_m) 212 dut_cis_stream.send(b'abcdefgh' * 10) 213 assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10)) 214 215 @metadata(pts_test_id="IAL/CIS/FRA/SLA/BV-26-C", 216 pts_test_name="connected isochronous stream, framed data, peripheral role") 217 def test_iso_cis_fra_sla_bv_26_c(self): 218 self.skip_if_iso_not_supported() 219 """ 220 Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length. 221 """ 222 cig_id = 0x01 223 sdu_interval_m_to_s = 0x14D5 224 sdu_interval_s_to_m = 0x14D5 225 peripherals_clock_accuracy = 0 226 packing = 0 227 framing = 1 228 max_transport_latency_m_to_s = 0 229 max_transport_latency_s_to_m = 0 230 cis_id = 0x01 231 max_sdu_m_to_s = 100 232 max_sdu_s_to_m = 100 233 phy_m_to_s = 0x02 234 phy_s_to_m = 0x00 235 bn_m_to_s = 1 236 bn_s_to_m = 1 237 238 self._setup_link_from_cert() 239 (dut_cis_stream, 240 cert_cis_stream) = self._setup_cis_from_cert(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, 241 peripherals_clock_accuracy, packing, framing, 242 max_transport_latency_m_to_s, max_transport_latency_s_to_m, 243 cis_id, max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m, 244 bn_m_to_s, bn_s_to_m) 245 dut_cis_stream.send(b'abcdefgh' * 10) 246 assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10)) 247 248 249if __name__ == '__main__': 250 test_runner.main() 251