• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import grpc
17import logging
18
19from avatar.bumble_server import utils
20from bumble.core import ProtocolError
21from bumble.device import Connection as BumbleConnection, Device, Peer
22from bumble.gatt_client import CharacteristicProxy, ServiceProxy
23from pandora_experimental.gatt_grpc_aio import GATTServicer
24from pandora_experimental.gatt_pb2 import (
25    SUCCESS,
26    AttValue,
27    ClearCacheRequest,
28    ClearCacheResponse,
29    DiscoverServiceByUuidRequest,
30    DiscoverServicesRequest,
31    DiscoverServicesResponse,
32    ExchangeMTURequest,
33    ExchangeMTUResponse,
34    GattCharacteristic,
35    GattCharacteristicDescriptor,
36    GattService,
37    ReadCharacteristicDescriptorRequest,
38    ReadCharacteristicDescriptorResponse,
39    ReadCharacteristicRequest,
40    ReadCharacteristicResponse,
41    ReadCharacteristicsFromUuidRequest,
42    ReadCharacteristicsFromUuidResponse,
43    WriteRequest,
44    WriteResponse,
45)
46from typing import Dict, List
47
48
49class GATTService(GATTServicer):
50    device: Device
51    peers: Dict[int, Peer]
52
53    def __init__(self, device: Device) -> None:
54        super().__init__()
55        self.device = device
56        self.peers: Dict[int, Peer] = {}
57        self.device.on('connection', self.on_connection)  # type: ignore
58        self.device.on('disconnection', self.on_disconnection)  # type: ignore
59
60    def __del__(self) -> None:
61        self.device.remove_listener('connection', self.on_connection)  # type: ignore
62        self.device.remove_listener('disconnection', self.on_disconnection)  # type: ignore
63
64    def on_connection(self, connection: BumbleConnection) -> None:
65        self.peers[connection.handle] = Peer(connection)  # type: ignore[no-untyped-call]
66
67    def on_disconnection(self, connection: BumbleConnection) -> None:
68        del self.peers[connection.handle]
69
70    @utils.rpc
71    async def ExchangeMTU(self, request: ExchangeMTURequest, context: grpc.ServicerContext) -> ExchangeMTUResponse:
72        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
73        logging.info(f"ExchangeMTU: {connection_handle}")
74
75        connection = self.device.lookup_connection(connection_handle)
76        assert connection
77        peer = self.peers[connection.handle]
78
79        mtu = await peer.request_mtu(request.mtu)  # type: ignore
80        assert mtu == request.mtu
81
82        return ExchangeMTUResponse()
83
84    @utils.rpc
85    async def WriteAttFromHandle(self, request: WriteRequest, context: grpc.ServicerContext) -> WriteResponse:
86        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
87        logging.info(f"WriteAttFromHandle: {connection_handle}")
88
89        connection = self.device.lookup_connection(connection_handle)
90        assert connection
91        peer = self.peers[connection.handle]
92
93        try:
94            await peer.write_value(request.handle, request.value, with_response=True)  # type: ignore
95            status = SUCCESS
96        except ProtocolError as e:
97            status = e.error_code
98
99        return WriteResponse(handle=request.handle, status=status)
100
101    @utils.rpc
102    async def DiscoverServiceByUuid(
103        self, request: DiscoverServiceByUuidRequest, context: grpc.ServicerContext
104    ) -> DiscoverServicesResponse:
105        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
106        logging.info(f"DiscoverServiceByUuid: {connection_handle}")
107
108        connection = self.device.lookup_connection(connection_handle)
109        assert connection
110        peer = self.peers[connection.handle]
111
112        services: List[ServiceProxy] = await peer.discover_service(request.uuid)  # type: ignore
113
114        async def feed_service(service: ServiceProxy) -> None:
115            characteristic: CharacteristicProxy
116            for characteristic in await peer.discover_characteristics(service=service):  # type: ignore
117                await characteristic.discover_descriptors()  # type: ignore[no-untyped-call]
118
119        await asyncio.gather(*(feed_service(service) for service in services))
120
121        return DiscoverServicesResponse(
122            services=[
123                GattService(
124                    handle=service.handle,
125                    type=int.from_bytes(bytes(service.type), 'little'),
126                    uuid=service.uuid.to_hex_str('-'),  # type: ignore
127                    characteristics=[
128                        GattCharacteristic(
129                            properties=characteristic.properties,  # type: ignore
130                            permissions=0,  # TODO
131                            uuid=characteristic.uuid.to_hex_str('-'),  # type: ignore
132                            handle=characteristic.handle,  # type: ignore
133                            descriptors=[
134                                GattCharacteristicDescriptor(
135                                    handle=descriptor.handle,  # type: ignore
136                                    permissions=0,  # TODO
137                                    uuid=str(descriptor.type),  # type: ignore
138                                )
139                                for descriptor in characteristic.descriptors  # type: ignore
140                            ],
141                        )
142                        for characteristic in service.characteristics  # type: ignore
143                    ],
144                )
145                for service in services
146            ]
147        )
148
149    @utils.rpc
150    async def DiscoverServices(
151        self, request: DiscoverServicesRequest, context: grpc.ServicerContext
152    ) -> DiscoverServicesResponse:
153        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
154        logging.info(f"DiscoverServices: {connection_handle}")
155
156        connection = self.device.lookup_connection(connection_handle)
157        assert connection
158        peer = self.peers[connection.handle]
159
160        services: List[ServiceProxy] = await peer.discover_services()  # type: ignore
161
162        async def feed_service(service: ServiceProxy) -> None:
163            for characteristic in await peer.discover_characteristics(service=service):  # type: ignore
164                await characteristic.discover_descriptors()  # type: ignore
165
166        await asyncio.gather(*(feed_service(service) for service in services))
167
168        return DiscoverServicesResponse(
169            services=[
170                GattService(
171                    handle=service.handle,
172                    type=int.from_bytes(bytes(service.type), 'little'),
173                    uuid=service.uuid.to_hex_str('-'),  # type: ignore
174                    characteristics=[
175                        GattCharacteristic(
176                            properties=characteristic.properties,  # type: ignore
177                            permissions=0,  # TODO
178                            uuid=characteristic.uuid.to_hex_str('-'),  # type: ignore
179                            handle=characteristic.handle,  # type: ignore
180                            descriptors=[
181                                GattCharacteristicDescriptor(
182                                    handle=descriptor.handle,  # type: ignore
183                                    permissions=0,  # TODO
184                                    uuid=str(descriptor.type),  # type: ignore
185                                )
186                                for descriptor in characteristic.descriptors  # type: ignore
187                            ],
188                        )
189                        for characteristic in service.characteristics  # type: ignore
190                    ],
191                )
192                for service in services
193            ]
194        )
195
196    # TODO: implement `DiscoverServicesSdp`
197
198    @utils.rpc
199    async def ClearCache(self, request: ClearCacheRequest, context: grpc.ServicerContext) -> ClearCacheResponse:
200        logging.info("ClearCache")
201        return ClearCacheResponse()
202
203    @utils.rpc
204    async def ReadCharacteristicFromHandle(
205        self, request: ReadCharacteristicRequest, context: grpc.ServicerContext
206    ) -> ReadCharacteristicResponse:
207        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
208        logging.info(f"ReadCharacteristicFromHandle: {connection_handle}")
209
210        connection = self.device.lookup_connection(connection_handle)
211        assert connection
212        peer = self.peers[connection.handle]
213
214        try:
215            value = await peer.read_value(request.handle)  # type: ignore
216            status = SUCCESS
217        except ProtocolError as e:
218            value = bytes()
219            status = e.error_code
220
221        return ReadCharacteristicResponse(value=AttValue(value=value), status=status)
222
223    @utils.rpc
224    async def ReadCharacteristicsFromUuid(
225        self, request: ReadCharacteristicsFromUuidRequest, context: grpc.ServicerContext
226    ) -> ReadCharacteristicsFromUuidResponse:
227        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
228        logging.info(f"ReadCharacteristicsFromUuid: {connection_handle}")
229
230        connection = self.device.lookup_connection(connection_handle)
231        assert connection
232        peer = self.peers[connection.handle]
233
234        service_mock = type('', (), {'handle': request.start_handle, 'end_group_handle': request.end_handle})()
235
236        try:
237            characteristics = await peer.read_characteristics_by_uuid(request.uuid, service_mock)  # type: ignore
238
239            return ReadCharacteristicsFromUuidResponse(
240                characteristics_read=[
241                    ReadCharacteristicResponse(
242                        value=AttValue(value=value, handle=handle),  # type: ignore
243                        status=SUCCESS,
244                    )
245                    for handle, value in characteristics  # type: ignore
246                ]
247            )
248
249        except ProtocolError as e:
250            return ReadCharacteristicsFromUuidResponse(
251                characteristics_read=[ReadCharacteristicResponse(status=e.error_code)]
252            )
253
254    @utils.rpc
255    async def ReadCharacteristicDescriptorFromHandle(
256        self, request: ReadCharacteristicDescriptorRequest, context: grpc.ServicerContext
257    ) -> ReadCharacteristicDescriptorResponse:
258        connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
259        logging.info(f"ReadCharacteristicDescriptorFromHandle: {connection_handle}")
260
261        connection = self.device.lookup_connection(connection_handle)
262        assert connection
263        peer = self.peers[connection.handle]
264
265        try:
266            value = await peer.read_value(request.handle)  # type: ignore
267            status = SUCCESS
268        except ProtocolError as e:
269            value = bytes()
270            status = e.error_code
271
272        return ReadCharacteristicDescriptorResponse(value=AttValue(value=value), status=status)
273