• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import avatar
17import grpc
18import logging
19
20from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices
21from bumble import pandora as bumble_server
22from bumble.gatt import Characteristic, Service
23from bumble.l2cap import L2CAP_Control_Frame
24from bumble.pairing import PairingConfig
25from bumble_experimental.gatt import GATTService
26from mobly import base_test, signals, test_runner
27from mobly.asserts import assert_equal  # type: ignore
28from mobly.asserts import assert_in  # type: ignore
29from mobly.asserts import assert_is_not_none  # type: ignore
30from mobly.asserts import assert_not_in  # type: ignore
31from mobly.asserts import assert_true  # type: ignore
32from pandora.host_pb2 import RANDOM, Connection, DataTypes
33from pandora.security_pb2 import LE_LEVEL3, PairingEventAnswer, SecureResponse
34from pandora_experimental.gatt_grpc import GATT
35from pandora_experimental.gatt_grpc_aio import GATT as AioGATT, add_GATTServicer_to_server
36from pandora_experimental.gatt_pb2 import SUCCESS, ReadCharacteristicsFromUuidResponse
37from typing import Optional, Tuple
38
39
40class GattTest(base_test.BaseTestClass):  # type: ignore[misc]
41    devices: Optional[PandoraDevices] = None
42
43    # pandora devices.
44    dut: PandoraDevice
45    ref: PandoraDevice
46
47    def setup_class(self) -> None:
48        # Register experimental bumble servicers hook.
49        bumble_server.register_servicer_hook(
50            lambda bumble, _, server: add_GATTServicer_to_server(GATTService(bumble.device), server)
51        )
52
53        self.devices = PandoraDevices(self)
54        self.dut, self.ref, *_ = self.devices
55
56    def teardown_class(self) -> None:
57        if self.devices:
58            self.devices.stop_all()
59
60    @avatar.asynchronous
61    async def setup_test(self) -> None:
62        await asyncio.gather(self.dut.reset(), self.ref.reset())
63
64    def test_print_dut_gatt_services(self) -> None:
65        advertise = self.ref.host.Advertise(legacy=True, connectable=True)
66        dut_ref = self.dut.host.ConnectLE(public=self.ref.address, own_address_type=RANDOM).connection
67        assert_is_not_none(dut_ref)
68        assert dut_ref
69        advertise.cancel()
70
71        gatt = GATT(self.dut.channel)
72        services = gatt.DiscoverServices(dut_ref)
73        self.dut.log.info(f'DUT services: {services}')
74
75    def test_print_ref_gatt_services(self) -> None:
76        advertise = self.dut.host.Advertise(
77            legacy=True,
78            connectable=True,
79            own_address_type=RANDOM,
80            data=DataTypes(manufacturer_specific_data=b'pause cafe'),
81        )
82
83        scan = self.ref.host.Scan()
84        dut = next((x for x in scan if b'pause cafe' in x.data.manufacturer_specific_data))
85        scan.cancel()
86
87        ref_dut = self.ref.host.ConnectLE(own_address_type=RANDOM, **dut.address_asdict()).connection
88        assert_is_not_none(ref_dut)
89        assert ref_dut
90        advertise.cancel()
91
92        gatt = GATT(self.ref.channel)
93        services = gatt.DiscoverServices(ref_dut)
94        self.ref.log.info(f'REF services: {services}')
95
96    async def connect_dut_to_ref(self) -> Tuple[Connection, Connection]:
97        ref_advertisement = self.ref.aio.host.Advertise(
98            legacy=True,
99            connectable=True,
100        )
101
102        dut_connection_to_ref = (
103            await self.dut.aio.host.ConnectLE(public=self.ref.address, own_address_type=RANDOM)
104        ).connection
105        assert_is_not_none(dut_connection_to_ref)
106        assert dut_connection_to_ref
107
108        ref_connection_to_dut = (await anext(aiter(ref_advertisement))).connection
109        ref_advertisement.cancel()
110
111        return dut_connection_to_ref, ref_connection_to_dut
112
113    @avatar.asynchronous
114    async def test_read_characteristic_while_pairing(self) -> None:
115        if isinstance(self.dut, BumblePandoraDevice):
116            raise signals.TestSkip('TODO: b/273941061')
117        if not isinstance(self.ref, BumblePandoraDevice):
118            raise signals.TestSkip('Test require Bumble as reference device(s)')
119
120        # arrange: set up GATT service on REF side with a characteristic
121        # that can only be read after pairing
122        SERVICE_UUID = "00005A00-0000-1000-8000-00805F9B34FB"
123        CHARACTERISTIC_UUID = "00006A00-0000-1000-8000-00805F9B34FB"
124        service = Service(
125            SERVICE_UUID,
126            [
127                Characteristic(
128                    CHARACTERISTIC_UUID,
129                    Characteristic.READ,
130                    Characteristic.READ_REQUIRES_ENCRYPTION,
131                    b"Hello, world!",
132                ),
133            ],
134        )
135        self.ref.device.add_service(service)  # type:ignore
136        # disable MITM requirement on REF side (since it only does just works)
137        self.ref.device.pairing_config_factory = lambda _: PairingConfig(  # type:ignore
138            sc=True, mitm=False, bonding=True
139        )
140        # manually handle pairing on the DUT side
141        dut_pairing_events = self.dut.aio.security.OnPairing()
142        # set up connection
143        dut_connection_to_ref, ref_connection_to_dut = await self.connect_dut_to_ref()
144
145        # act: initiate pairing from REF side (send a security request)
146        async def ref_secure() -> SecureResponse:
147            return await self.ref.aio.security.Secure(connection=ref_connection_to_dut, le=LE_LEVEL3)
148
149        ref_secure_task = asyncio.create_task(ref_secure())
150
151        # wait for pairing to start
152        event = await anext(dut_pairing_events)
153
154        # before acknowledging pairing, start a GATT read
155        dut_gatt = AioGATT(self.dut.aio.channel)
156
157        async def dut_read() -> ReadCharacteristicsFromUuidResponse:
158            return await dut_gatt.ReadCharacteristicsFromUuid(dut_connection_to_ref, CHARACTERISTIC_UUID, 1, 0xFFFF)
159
160        dut_read_task = asyncio.create_task(dut_read())
161
162        await asyncio.sleep(3)
163
164        # now continue with pairing
165        dut_pairing_events.send_nowait(PairingEventAnswer(event=event, confirm=True))
166
167        # android pops up a second pairing notification for some reason, accept it
168        event = await anext(dut_pairing_events)
169        dut_pairing_events.send_nowait(PairingEventAnswer(event=event, confirm=True))
170
171        # assert: that the read succeeded (so Android re-tried the read after pairing)
172        read_response = await dut_read_task
173        self.ref.log.info(read_response)
174        assert_equal(read_response.characteristics_read[0].status, SUCCESS)
175        assert_equal(read_response.characteristics_read[0].value.value, b"Hello, world!")
176
177        # make sure pairing was successful
178        ref_secure_res = await ref_secure_task
179        assert_equal(ref_secure_res.result_variant(), 'success')
180
181    @avatar.asynchronous
182    async def test_rediscover_whenever_unbonded(self) -> None:
183        if not isinstance(self.ref, BumblePandoraDevice):
184            raise signals.TestSkip('Test require Bumble as reference device(s)')
185
186        # arrange: set up one GATT service on REF side
187        dut_gatt = AioGATT(self.dut.aio.channel)
188        SERVICE_UUID_1 = "00005A00-0000-1000-8000-00805F9B34FB"
189        SERVICE_UUID_2 = "00005A01-0000-1000-8000-00805F9B34FB"
190        self.ref.device.add_service(Service(SERVICE_UUID_1, []))  # type:ignore
191        # connect both devices
192        dut_connection_to_ref, ref_connection_to_dut = await self.connect_dut_to_ref()
193
194        # act: perform service discovery, disconnect, add the second service, reconnect, and try discovery again
195        first_discovery = await dut_gatt.DiscoverServices(dut_connection_to_ref)
196        await self.ref.aio.host.Disconnect(ref_connection_to_dut)
197        self.ref.device.add_service(Service(SERVICE_UUID_2, []))  # type:ignore
198        dut_connection_to_ref, _ = await self.connect_dut_to_ref()
199        second_discovery = await dut_gatt.DiscoverServices(dut_connection_to_ref)
200
201        # assert: that we found only one service in the first discovery
202        assert_in(SERVICE_UUID_1, (service.uuid for service in first_discovery.services))
203        assert_not_in(SERVICE_UUID_2, (service.uuid for service in first_discovery.services))
204        # assert: but found both in the second discovery
205        assert_in(SERVICE_UUID_1, (service.uuid for service in second_discovery.services))
206        assert_in(SERVICE_UUID_2, (service.uuid for service in second_discovery.services))
207
208    @avatar.asynchronous
209    async def test_do_not_discover_when_bonded(self) -> None:
210        # NOTE: if service change indication is ever enabled in Bumble, both this test + the previous test must DISABLE IT
211        # otherwise this test will fail, and the previous test will pass even on a broken implementation
212
213        raise signals.TestSkip('TODO(aryarahul): b/276757181')
214        if not isinstance(self.ref, BumblePandoraDevice):
215            raise signals.TestSkip('Test require Bumble as reference device(s)')
216
217        # arrange: set up one GATT service on REF side
218        dut_gatt = AioGATT(self.dut.aio.channel)
219        SERVICE_UUID_1 = "00005A00-0000-1000-8000-00805F9B34FB"
220        SERVICE_UUID_2 = "00005A01-0000-1000-8000-00805F9B34FB"
221        self.ref.device.add_service(Service(SERVICE_UUID_1, []))  # type:ignore
222        # connect both devices
223        dut_connection_to_ref, ref_connection_to_dut = await self.connect_dut_to_ref()
224        # bond devices and disconnect
225        await self.dut.aio.security.Secure(connection=dut_connection_to_ref, le=LE_LEVEL3)
226        await self.ref.aio.host.Disconnect(ref_connection_to_dut)
227
228        # act: connect, perform service discovery, disconnect, add the second service, reconnect, and try discovery again
229        dut_connection_to_ref, ref_connection_to_dut = await self.connect_dut_to_ref()
230        first_discovery = await dut_gatt.DiscoverServices(dut_connection_to_ref)
231        await self.ref.aio.host.Disconnect(ref_connection_to_dut)
232
233        self.ref.device.add_service(Service(SERVICE_UUID_2, []))  # type:ignore
234        dut_connection_to_ref, _ = await self.connect_dut_to_ref()
235        second_discovery = await dut_gatt.DiscoverServices(dut_connection_to_ref)
236
237        # assert: that we found only one service in the first discovery
238        assert_in(SERVICE_UUID_1, (service.uuid for service in first_discovery.services))
239        assert_not_in(SERVICE_UUID_2, (service.uuid for service in first_discovery.services))
240        # assert: but found both in the second discovery
241        assert_in(SERVICE_UUID_1, (service.uuid for service in second_discovery.services))
242        assert_in(SERVICE_UUID_2, (service.uuid for service in second_discovery.services))
243
244    @avatar.asynchronous
245    async def test_eatt_when_not_encrypted_no_timeout(self) -> None:
246        if not isinstance(self.ref, BumblePandoraDevice):
247            raise signals.TestSkip('Test require Bumble as reference device(s)')
248        advertise = self.dut.aio.host.Advertise(
249            legacy=True,
250            connectable=True,
251            own_address_type=RANDOM,
252            data=DataTypes(manufacturer_specific_data=b'pause cafe'),
253        )
254
255        scan = self.ref.aio.host.Scan()
256        dut = await anext((x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data))
257        scan.cancel()
258
259        ref_dut = (await self.ref.aio.host.ConnectLE(own_address_type=RANDOM, **dut.address_asdict())).connection
260        assert_is_not_none(ref_dut)
261        assert ref_dut
262        advertise.cancel()
263
264        connection = self.ref.device.lookup_connection(int.from_bytes(ref_dut.cookie.value, 'big'))
265        assert connection
266
267        connection_request = L2CAP_Control_Frame.from_bytes(
268            (
269                b"\x17"  # code of L2CAP_CREDIT_BASED_CONNECTION_REQ
270                b"\x01"  # identifier
271                b"\x0a\x00"  # data length
272                b"\x27\x00"  # psm(EATT)
273                b"\x64\x00"  # MTU
274                b"\x64\x00"  # MPS
275                b"\x64\x00"  # initial credit
276                b"\x40\x00"  # source cid[0]
277            )
278        )
279
280        fut = asyncio.get_running_loop().create_future()
281        setattr(self.ref.device.l2cap_channel_manager, "on_[0x18]", lambda _, _1, frame: fut.set_result(frame))
282        self.ref.device.l2cap_channel_manager.send_control_frame(  # type:ignore
283            connection, 0x05, connection_request
284        )
285        control_frame = await fut
286
287        assert_equal(bytes(control_frame)[10], 0x05)  # All connections refused – insufficient authentication
288        assert_true(await is_connected(self.ref, ref_dut), "Device is no longer connected")
289
290
291async def is_connected(device: PandoraDevice, connection: Connection) -> bool:
292    try:
293        await device.aio.host.WaitDisconnection(connection=connection, timeout=5)
294        return False
295    except grpc.RpcError as e:
296        assert_equal(e.code(), grpc.StatusCode.DEADLINE_EXCEEDED)  # type: ignore
297        return True
298
299
300if __name__ == '__main__':
301    logging.basicConfig(level=logging.DEBUG)
302    test_runner.main()  # type: ignore
303