• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2#   Copyright 2021 - The Android Open Source Project
3#
4#   Licensed under the Apache License, Version 2.0 (the "License");
5#   you may not use this file except in compliance with the License.
6#   You may obtain a copy of the License at
7#
8#       http://www.apache.org/licenses/LICENSE-2.0
9#
10#   Unless required by applicable law or agreed to in writing, software
11#   distributed under the License is distributed on an "AS IS" BASIS,
12#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13#   See the License for the specific language governing permissions and
14#   limitations under the License.
15
16from blueberry.tests.gd.cert.matchers import IsoMatchers
17from blueberry.tests.gd.cert.metadata import metadata
18from blueberry.tests.gd.cert.py_l2cap import PyLeL2cap
19from blueberry.tests.gd.cert.py_le_iso import PyLeIso
20from blueberry.tests.gd.cert.py_le_iso import CisTestParameters
21from blueberry.tests.gd.cert.truth import assertThat
22from blueberry.tests.gd.cert import gd_base_test
23from blueberry.tests.gd.iso.cert_le_iso import CertLeIso
24from blueberry.tests.gd.l2cap.le.cert_le_l2cap import CertLeL2cap
25from blueberry.facade import common_pb2 as common
26from blueberry.facade.hci import controller_facade_pb2 as controller_facade
27from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade
28from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade
29from mobly import asserts
30from mobly import test_runner
31import hci_packets as hci
32
33
34class LeIsoTest(gd_base_test.GdBaseTestClass):
35
36    def setup_class(self):
37        gd_base_test.GdBaseTestClass.setup_class(self, dut_module='L2CAP', cert_module='HCI_INTERFACES')
38
39    def setup_test(self):
40        gd_base_test.GdBaseTestClass.setup_test(self)
41
42        self.dut_l2cap = PyLeL2cap(self.dut)
43        self.cert_l2cap = CertLeL2cap(self.cert)
44        self.dut_address = common.BluetoothAddressWithType(
45            address=common.BluetoothAddress(address=bytes(b'D0:05:04:03:02:01')), type=common.RANDOM_DEVICE_ADDRESS)
46        self.cert_address = common.BluetoothAddressWithType(
47            address=common.BluetoothAddress(address=bytes(b'C0:11:FF:AA:33:22')), type=common.RANDOM_DEVICE_ADDRESS)
48        dut_privacy_policy = le_initiator_address_facade.PrivacyPolicy(
49            address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS,
50            address_with_type=self.dut_address,
51            rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
52            minimum_rotation_time=0,
53            maximum_rotation_time=0)
54        self.dut_l2cap._device.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(dut_privacy_policy)
55        privacy_policy = le_initiator_address_facade.PrivacyPolicy(
56            address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS,
57            address_with_type=self.cert_address,
58            rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
59            minimum_rotation_time=0,
60            maximum_rotation_time=0)
61        self.cert_l2cap._device.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy)
62
63        self.dut_iso = PyLeIso(self.dut)
64        self.cert_iso = CertLeIso(self.cert)
65
66    def teardown_test(self):
67        self.dut_iso.close()
68        self.cert_iso.close()
69
70        self.cert_l2cap.close()
71        self.dut_l2cap.close()
72        gd_base_test.GdBaseTestClass.teardown_test(self)
73
74    #cert becomes central of connection, dut peripheral
75    def _setup_link_from_cert(self):
76        # DUT Advertises
77        gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME, data=list(bytes(b'Im_The_DUT')))
78        gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize())
79        config = le_advertising_facade.AdvertisingConfig(
80            advertisement=[gap_data],
81            interval_min=512,
82            interval_max=768,
83            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
84            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
85            channel_map=7,
86            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
87        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
88        create_response = self.dut.hci_le_advertising_manager.CreateAdvertiser(request)
89        self.cert_l2cap.connect_le_acl(self.dut_address)
90
91    def _setup_cis_from_cert(self, cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, peripherals_clock_accuracy,
92                             packing, framing, max_transport_latency_m_to_s, max_transport_latency_s_to_m, cis_id,
93                             max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m, bn_m_to_s, bn_s_to_m):
94
95        self.cert_iso.le_set_cig_parameters(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m,
96                                            peripherals_clock_accuracy, packing, framing, max_transport_latency_m_to_s,
97                                            max_transport_latency_s_to_m, cis_id, max_sdu_m_to_s, max_sdu_s_to_m,
98                                            phy_m_to_s, phy_s_to_m, bn_m_to_s, bn_s_to_m)
99
100        cis_handles = self.cert_iso.wait_le_set_cig_parameters_complete()
101
102        cis_handle = cis_handles[0]
103
104        acl_connection_handle = self.cert_l2cap._le_acl.handle
105        self.cert_iso.le_cretate_cis([(cis_handle, acl_connection_handle)])
106        dut_cis_stream = self.dut_iso.wait_le_cis_established()
107        cert_cis_stream = self.cert_iso.wait_le_cis_established()
108        return (dut_cis_stream, cert_cis_stream)
109
110    def skip_if_iso_not_supported(self):
111        supported = self.dut.hci_controller.IsSupportedCommand(
112            controller_facade.OpCodeMsg(op_code=int(hci.OpCode.LE_SET_CIG_PARAMETERS)))
113        if (not supported.supported):
114            asserts.skip("Skipping this test.  The chip doesn't support LE ISO")
115
116    @metadata(pts_test_id="IAL/CIS/UNF/SLA/BV-01-C",
117              pts_test_name="connected isochronous stream, unframed data, peripheral role")
118    def test_iso_cis_unf_sla_bv_01_c(self):
119        self.skip_if_iso_not_supported()
120        """
121            Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length.
122        """
123        cig_id = 0x01
124        sdu_interval_m_to_s = 0
125        sdu_interval_s_to_m = 0x186a
126        peripherals_clock_accuracy = 0
127        packing = 0
128        framing = 0
129        max_transport_latency_m_to_s = 0
130        max_transport_latency_s_to_m = 0
131        cis_id = 0x01
132        max_sdu_m_to_s = 100
133        max_sdu_s_to_m = 100
134        phy_m_to_s = 0x02
135        phy_s_to_m = 0x00
136        bn_m_to_s = 0
137        bn_s_to_m = 2
138
139        self._setup_link_from_cert()
140        (dut_cis_stream,
141         cert_cis_stream) = self._setup_cis_from_cert(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m,
142                                                      peripherals_clock_accuracy, packing, framing,
143                                                      max_transport_latency_m_to_s, max_transport_latency_s_to_m,
144                                                      cis_id, max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m,
145                                                      bn_m_to_s, bn_s_to_m)
146        dut_cis_stream.send(b'abcdefgh' * 10)
147        assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10))
148
149    @metadata(pts_test_id="IAL/CIS/UNF/SLA/BV-25-C",
150              pts_test_name="connected isochronous stream, unframed data, peripheral role")
151    def test_iso_cis_unf_sla_bv_25_c(self):
152        self.skip_if_iso_not_supported()
153        """
154            Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length.
155        """
156        cig_id = 0x01
157        sdu_interval_m_to_s = 0x7530
158        sdu_interval_s_to_m = 0x7530
159        peripherals_clock_accuracy = 0
160        packing = 0
161        framing = 0
162        max_transport_latency_m_to_s = 0
163        max_transport_latency_s_to_m = 0
164        cis_id = 0x01
165        max_sdu_m_to_s = 100
166        max_sdu_s_to_m = 100
167        phy_m_to_s = 0x02
168        phy_s_to_m = 0x00
169        bn_m_to_s = 3
170        bn_s_to_m = 1
171
172        self._setup_link_from_cert()
173        (dut_cis_stream,
174         cert_cis_stream) = self._setup_cis_from_cert(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m,
175                                                      peripherals_clock_accuracy, packing, framing,
176                                                      max_transport_latency_m_to_s, max_transport_latency_s_to_m,
177                                                      cis_id, max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m,
178                                                      bn_m_to_s, bn_s_to_m)
179        dut_cis_stream.send(b'abcdefgh' * 10)
180        assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10))
181
182    @metadata(pts_test_id="IAL/CIS/FRA/SLA/BV-03-C",
183              pts_test_name="connected isochronous stream, framed data, peripheral role")
184    def test_iso_cis_fra_sla_bv_03_c(self):
185        self.skip_if_iso_not_supported()
186        """
187            Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length.
188        """
189        cig_id = 0x01
190        sdu_interval_m_to_s = 0x0000
191        sdu_interval_s_to_m = 0x4e30
192        peripherals_clock_accuracy = 0
193        packing = 0
194        framing = 1
195        max_transport_latency_m_to_s = 0
196        max_transport_latency_s_to_m = 0
197        cis_id = 0x01
198        max_sdu_m_to_s = 100
199        max_sdu_s_to_m = 100
200        phy_m_to_s = 0x02
201        phy_s_to_m = 0x00
202        bn_m_to_s = 0
203        bn_s_to_m = 2
204
205        self._setup_link_from_cert()
206        (dut_cis_stream,
207         cert_cis_stream) = self._setup_cis_from_cert(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m,
208                                                      peripherals_clock_accuracy, packing, framing,
209                                                      max_transport_latency_m_to_s, max_transport_latency_s_to_m,
210                                                      cis_id, max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m,
211                                                      bn_m_to_s, bn_s_to_m)
212        dut_cis_stream.send(b'abcdefgh' * 10)
213        assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10))
214
215    @metadata(pts_test_id="IAL/CIS/FRA/SLA/BV-26-C",
216              pts_test_name="connected isochronous stream, framed data, peripheral role")
217    def test_iso_cis_fra_sla_bv_26_c(self):
218        self.skip_if_iso_not_supported()
219        """
220            Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length.
221        """
222        cig_id = 0x01
223        sdu_interval_m_to_s = 0x14D5
224        sdu_interval_s_to_m = 0x14D5
225        peripherals_clock_accuracy = 0
226        packing = 0
227        framing = 1
228        max_transport_latency_m_to_s = 0
229        max_transport_latency_s_to_m = 0
230        cis_id = 0x01
231        max_sdu_m_to_s = 100
232        max_sdu_s_to_m = 100
233        phy_m_to_s = 0x02
234        phy_s_to_m = 0x00
235        bn_m_to_s = 1
236        bn_s_to_m = 1
237
238        self._setup_link_from_cert()
239        (dut_cis_stream,
240         cert_cis_stream) = self._setup_cis_from_cert(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m,
241                                                      peripherals_clock_accuracy, packing, framing,
242                                                      max_transport_latency_m_to_s, max_transport_latency_s_to_m,
243                                                      cis_id, max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m,
244                                                      bn_m_to_s, bn_s_to_m)
245        dut_cis_stream.send(b'abcdefgh' * 10)
246        assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10))
247
248
249if __name__ == '__main__':
250    test_runner.main()
251