• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import grpc
17import logging
18
19from bumble.att import Attribute
20from bumble.core import ProtocolError
21from bumble.device import Connection as BumbleConnection, Device, Peer
22from bumble.gatt import Characteristic, Descriptor, Service
23from bumble.gatt_client import CharacteristicProxy, ServiceProxy
24from bumble.pandora import utils
25from pandora_experimental.gatt_grpc_aio import GATTServicer
26from pandora_experimental.gatt_pb2 import (
27    ATTRIBUTE_NOT_FOUND,
28    SUCCESS,
29    AttStatusCode,
30    AttValue,
31    ClearCacheRequest,
32    ClearCacheResponse,
33    DiscoverServiceByUuidRequest,
34    DiscoverServicesRequest,
35    DiscoverServicesResponse,
36    ExchangeMTURequest,
37    ExchangeMTUResponse,
38    GattCharacteristic,
39    GattCharacteristicDescriptor,
40    GattService,
41    IndicateOnCharacteristicRequest,
42    IndicateOnCharacteristicResponse,
43    NotifyOnCharacteristicRequest,
44    NotifyOnCharacteristicResponse,
45    ReadCharacteristicDescriptorRequest,
46    ReadCharacteristicDescriptorResponse,
47    ReadCharacteristicRequest,
48    ReadCharacteristicResponse,
49    ReadCharacteristicsFromUuidRequest,
50    ReadCharacteristicsFromUuidResponse,
51    RegisterServiceRequest,
52    RegisterServiceResponse,
53    WriteRequest,
54    WriteResponse,
55)
56from typing import Dict, List
57
58
59class GATTService(GATTServicer):
60    device: Device
61    peers: Dict[int, Peer]
62
63    def __init__(self, device: Device) -> None:
64        super().__init__()
65        self.device = device
66        self.peers: Dict[int, Peer] = {}
67        self.device.on('connection', self.on_connection)  # type: ignore
68        self.device.on('disconnection', self.on_disconnection)  # type: ignore
69
70    def __del__(self) -> None:
71        self.device.remove_listener('connection', self.on_connection)  # type: ignore
72        self.device.remove_listener('disconnection', self.on_disconnection)  # type: ignore
73
74    def on_connection(self, connection: BumbleConnection) -> None:
75        self.peers[connection.handle] = Peer(connection)  # type: ignore[no-untyped-call]
76
77    def on_disconnection(self, connection: BumbleConnection) -> None:
78        del self.peers[connection.handle]
79
80    @utils.rpc
81    async def ExchangeMTU(self, request: ExchangeMTURequest, context: grpc.ServicerContext) -> ExchangeMTUResponse:
82        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
83        logging.info(f"ExchangeMTU: {connection_handle}")
84
85        connection = self.device.lookup_connection(connection_handle)
86        assert connection
87        peer = self.peers[connection.handle]
88
89        mtu = await peer.request_mtu(request.mtu)  # type: ignore
90        assert mtu == request.mtu
91
92        return ExchangeMTUResponse()
93
94    @utils.rpc
95    async def WriteAttFromHandle(self, request: WriteRequest, context: grpc.ServicerContext) -> WriteResponse:
96        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
97        logging.info(f"WriteAttFromHandle: {connection_handle}")
98
99        connection = self.device.lookup_connection(connection_handle)
100        assert connection
101        peer = self.peers[connection.handle]
102
103        try:
104            await peer.write_value(request.handle, request.value, with_response=True)  # type: ignore
105            status: AttStatusCode = SUCCESS
106        except ProtocolError as e:
107            status = e.error_code  # type: ignore
108
109        return WriteResponse(handle=request.handle, status=status)
110
111    @utils.rpc
112    async def DiscoverServiceByUuid(self, request: DiscoverServiceByUuidRequest,
113                                    context: grpc.ServicerContext) -> DiscoverServicesResponse:
114        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
115        logging.info(f"DiscoverServiceByUuid: {connection_handle}")
116
117        connection = self.device.lookup_connection(connection_handle)
118        assert connection
119        peer = self.peers[connection.handle]
120
121        services: List[ServiceProxy] = await peer.discover_service(request.uuid)  # type: ignore
122
123        async def feed_service(service: ServiceProxy) -> None:
124            characteristic: CharacteristicProxy
125            for characteristic in await peer.discover_characteristics(service=service):  # type: ignore
126                await characteristic.discover_descriptors()  # type: ignore[no-untyped-call]
127
128        await asyncio.gather(*(feed_service(service) for service in services))
129
130        return DiscoverServicesResponse(services=[
131            GattService(
132                handle=service.handle,
133                type=int.from_bytes(bytes(service.type), 'little'),
134                uuid=service.uuid.to_hex_str('-'),  # type: ignore
135                characteristics=[
136                    GattCharacteristic(
137                        properties=characteristic.properties,  # type: ignore
138                        permissions=0,  # TODO
139                        uuid=characteristic.uuid.to_hex_str('-'),  # type: ignore
140                        handle=characteristic.handle,  # type: ignore
141                        descriptors=[
142                            GattCharacteristicDescriptor(
143                                handle=descriptor.handle,  # type: ignore
144                                permissions=0,  # TODO
145                                uuid=str(descriptor.type),  # type: ignore
146                            ) for descriptor in characteristic.descriptors  # type: ignore
147                        ],
148                    ) for characteristic in service.characteristics  # type: ignore
149                ],
150            ) for service in services
151        ])
152
153    @utils.rpc
154    async def DiscoverServices(self, request: DiscoverServicesRequest,
155                               context: grpc.ServicerContext) -> DiscoverServicesResponse:
156        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
157        logging.info(f"DiscoverServices: {connection_handle}")
158
159        connection = self.device.lookup_connection(connection_handle)
160        assert connection
161        peer = self.peers[connection.handle]
162
163        services: List[ServiceProxy] = await peer.discover_services()  # type: ignore
164
165        async def feed_service(service: ServiceProxy) -> None:
166            for characteristic in await peer.discover_characteristics(service=service):  # type: ignore
167                await characteristic.discover_descriptors()  # type: ignore
168
169        await asyncio.gather(*(feed_service(service) for service in services))
170
171        return DiscoverServicesResponse(services=[
172            GattService(
173                handle=service.handle,
174                type=int.from_bytes(bytes(service.type), 'little'),
175                uuid=service.uuid.to_hex_str('-'),  # type: ignore
176                characteristics=[
177                    GattCharacteristic(
178                        properties=characteristic.properties,  # type: ignore
179                        permissions=0,  # TODO
180                        uuid=characteristic.uuid.to_hex_str('-'),  # type: ignore
181                        handle=characteristic.handle,  # type: ignore
182                        descriptors=[
183                            GattCharacteristicDescriptor(
184                                handle=descriptor.handle,  # type: ignore
185                                permissions=0,  # TODO
186                                uuid=str(descriptor.type),  # type: ignore
187                            ) for descriptor in characteristic.descriptors  # type: ignore
188                        ],
189                    ) for characteristic in service.characteristics  # type: ignore
190                ],
191            ) for service in services
192        ])
193
194    # TODO: implement `DiscoverServicesSdp`
195
196    @utils.rpc
197    async def ClearCache(self, request: ClearCacheRequest, context: grpc.ServicerContext) -> ClearCacheResponse:
198        logging.info("ClearCache")
199        return ClearCacheResponse()
200
201    @utils.rpc
202    async def ReadCharacteristicFromHandle(self, request: ReadCharacteristicRequest,
203                                           context: grpc.ServicerContext) -> ReadCharacteristicResponse:
204        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
205        logging.info(f"ReadCharacteristicFromHandle: {connection_handle}")
206
207        connection = self.device.lookup_connection(connection_handle)
208        assert connection
209        peer = self.peers[connection.handle]
210
211        try:
212            value = await peer.read_value(request.handle)  # type: ignore
213            status: AttStatusCode = SUCCESS
214        except ProtocolError as e:
215            value = bytes()
216            status = e.error_code  # type: ignore
217
218        return ReadCharacteristicResponse(value=AttValue(value=value), status=status)
219
220    @utils.rpc
221    async def ReadCharacteristicsFromUuid(self, request: ReadCharacteristicsFromUuidRequest,
222                                          context: grpc.ServicerContext) -> ReadCharacteristicsFromUuidResponse:
223        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
224        logging.info(f"ReadCharacteristicsFromUuid: {connection_handle}")
225
226        connection = self.device.lookup_connection(connection_handle)
227        assert connection
228        peer = self.peers[connection.handle]
229
230        service_mock = type('', (), {'handle': request.start_handle, 'end_group_handle': request.end_handle})()
231
232        try:
233            characteristics = await peer.read_characteristics_by_uuid(request.uuid, service_mock)  # type: ignore
234
235            return ReadCharacteristicsFromUuidResponse(characteristics_read=[
236                ReadCharacteristicResponse(
237                    value=AttValue(value=value, handle=handle),  # type: ignore
238                    status=SUCCESS,
239                ) for handle, value in characteristics  # type: ignore
240            ])
241
242        except ProtocolError as e:
243            return ReadCharacteristicsFromUuidResponse(
244                characteristics_read=[ReadCharacteristicResponse(status=e.error_code)]  # type: ignore
245            )
246
247    @utils.rpc
248    async def ReadCharacteristicDescriptorFromHandle(
249            self, request: ReadCharacteristicDescriptorRequest,
250            context: grpc.ServicerContext) -> ReadCharacteristicDescriptorResponse:
251        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
252        logging.info(f"ReadCharacteristicDescriptorFromHandle: {connection_handle}")
253
254        connection = self.device.lookup_connection(connection_handle)
255        assert connection
256        peer = self.peers[connection.handle]
257
258        try:
259            value = await peer.read_value(request.handle)  # type: ignore
260            status: AttStatusCode = SUCCESS
261        except ProtocolError as e:
262            value = bytes()
263            status = e.error_code  # type: ignore
264
265        return ReadCharacteristicDescriptorResponse(value=AttValue(value=value), status=status)
266
267    @utils.rpc
268    def RegisterService(self, request: RegisterServiceRequest,
269                        context: grpc.ServicerContext) -> RegisterServiceResponse:
270        logging.info(f"RegisterService")
271
272        serviceUUID = request.service.uuid
273        characteristics = [
274            Characteristic(
275                properties=Characteristic.Properties(characteristicParam.properties),
276                permissions=Attribute.Permissions(characteristicParam.permissions),
277                uuid=characteristicParam.uuid,
278                descriptors=[
279                    Descriptor(
280                        attribute_type=descParam.uuid,
281                        permissions=Attribute.Permissions(descParam.permissions),
282                    ) for descParam in characteristicParam.descriptors
283                ],
284            ) for characteristicParam in request.service.characteristics
285        ]
286        service = Service(serviceUUID, characteristics)
287        self.device.add_service(service)  # type: ignore[no-untyped-call]
288
289        logging.info(f"RegisterService complete")
290        return RegisterServiceResponse()
291
292    @utils.rpc
293    async def NotifyOnCharacteristic(self, request: NotifyOnCharacteristicRequest,
294                                     context: grpc.ServicerContext) -> NotifyOnCharacteristicResponse:
295        logging.info(f"NotifyOnCharacteristic")
296
297        attr = self.device.gatt_server.get_attribute(request.handle)
298        if not attr:
299            return NotifyOnCharacteristicResponse(status=ATTRIBUTE_NOT_FOUND)
300        await self.device.notify_subscribers(attr, request.value)
301        return NotifyOnCharacteristicResponse(status=SUCCESS)
302
303    @utils.rpc
304    async def IndicateOnCharacteristic(self, request: IndicateOnCharacteristicRequest,
305                                       context: grpc.ServicerContext) -> IndicateOnCharacteristicResponse:
306        logging.info(f"IndicateOnCharacteristic")
307
308        attr = self.device.gatt_server.get_attribute(request.handle)
309        if not attr:
310            return IndicateOnCharacteristicResponse(status=ATTRIBUTE_NOT_FOUND)
311        await self.device.indicate_subscribers(attr, request.value)
312        return IndicateOnCharacteristicResponse(status=SUCCESS)
313