• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import os
20import struct
21import logging
22import click
23
24from bumble.colors import color
25from bumble.device import Device, Peer
26from bumble.core import AdvertisingData
27from bumble.gatt import Service, Characteristic, CharacteristicValue
28from bumble.utils import AsyncRunner
29from bumble.transport import open_transport_or_link
30from bumble.hci import HCI_Constant
31
32
33# -----------------------------------------------------------------------------
34# Constants
35# -----------------------------------------------------------------------------
36GG_GATTLINK_SERVICE_UUID = 'ABBAFF00-E56A-484C-B832-8B17CF6CBFE8'
37GG_GATTLINK_RX_CHARACTERISTIC_UUID = 'ABBAFF01-E56A-484C-B832-8B17CF6CBFE8'
38GG_GATTLINK_TX_CHARACTERISTIC_UUID = 'ABBAFF02-E56A-484C-B832-8B17CF6CBFE8'
39GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID = (
40    'ABBAFF03-E56A-484C-B832-8B17CF6CBFE8'
41)
42
43GG_PREFERRED_MTU = 256
44
45
46# -----------------------------------------------------------------------------
47class GattlinkL2capEndpoint:
48    def __init__(self):
49        self.l2cap_channel = None
50        self.l2cap_packet = b''
51        self.l2cap_packet_size = 0
52
53    # Called when an L2CAP SDU has been received
54    def on_coc_sdu(self, sdu):
55        print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
56        while len(sdu):
57            if self.l2cap_packet_size == 0:
58                # Expect a new packet
59                self.l2cap_packet_size = sdu[0] + 1
60                sdu = sdu[1:]
61            else:
62                bytes_needed = self.l2cap_packet_size - len(self.l2cap_packet)
63                chunk = min(bytes_needed, len(sdu))
64                self.l2cap_packet += sdu[:chunk]
65                sdu = sdu[chunk:]
66                if len(self.l2cap_packet) == self.l2cap_packet_size:
67                    self.on_l2cap_packet(self.l2cap_packet)
68                    self.l2cap_packet = b''
69                    self.l2cap_packet_size = 0
70
71
72# -----------------------------------------------------------------------------
73class GattlinkHubBridge(GattlinkL2capEndpoint, Device.Listener):
74    def __init__(self, device, peer_address):
75        super().__init__()
76        self.device = device
77        self.peer_address = peer_address
78        self.peer = None
79        self.tx_socket = None
80        self.rx_characteristic = None
81        self.tx_characteristic = None
82        self.l2cap_psm_characteristic = None
83
84        device.listener = self
85
86    async def start(self):
87        # Connect to the peer
88        print(f'=== Connecting to {self.peer_address}...')
89        await self.device.connect(self.peer_address)
90
91    async def connect_l2cap(self, psm):
92        print(color(f'### Connecting with L2CAP on PSM = {psm}', 'yellow'))
93        try:
94            self.l2cap_channel = await self.peer.connection.open_l2cap_channel(psm)
95            print(color('*** Connected', 'yellow'), self.l2cap_channel)
96            self.l2cap_channel.sink = self.on_coc_sdu
97
98        except Exception as error:
99            print(color(f'!!! Connection failed: {error}', 'red'))
100
101    @AsyncRunner.run_in_task()
102    # pylint: disable=invalid-overridden-method
103    async def on_connection(self, connection):
104        print(f'=== Connected to {connection}')
105        self.peer = Peer(connection)
106
107        # Request a larger MTU than the default
108        server_mtu = await self.peer.request_mtu(GG_PREFERRED_MTU)
109        print(f'### Server MTU = {server_mtu}')
110
111        # Discover all services
112        print(color('=== Discovering services', 'yellow'))
113        await self.peer.discover_service(GG_GATTLINK_SERVICE_UUID)
114        print(color('=== Services discovered', 'yellow'), self.peer.services)
115        for service in self.peer.services:
116            print(service)
117        services = self.peer.get_services_by_uuid(GG_GATTLINK_SERVICE_UUID)
118        if not services:
119            print(color('!!! Gattlink service not found', 'red'))
120            return
121
122        # Use the first Gattlink (there should only be one anyway)
123        gattlink_service = services[0]
124
125        # Discover all the characteristics for the service
126        characteristics = await gattlink_service.discover_characteristics()
127        print(color('=== Characteristics discovered', 'yellow'))
128        for characteristic in characteristics:
129            if characteristic.uuid == GG_GATTLINK_RX_CHARACTERISTIC_UUID:
130                self.rx_characteristic = characteristic
131            elif characteristic.uuid == GG_GATTLINK_TX_CHARACTERISTIC_UUID:
132                self.tx_characteristic = characteristic
133            elif (
134                characteristic.uuid == GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID
135            ):
136                self.l2cap_psm_characteristic = characteristic
137        print('RX:', self.rx_characteristic)
138        print('TX:', self.tx_characteristic)
139        print('PSM:', self.l2cap_psm_characteristic)
140
141        if self.l2cap_psm_characteristic:
142            # Subscribe to and then read the PSM value
143            await self.peer.subscribe(
144                self.l2cap_psm_characteristic, self.on_l2cap_psm_received
145            )
146            psm_bytes = await self.peer.read_value(self.l2cap_psm_characteristic)
147            psm = struct.unpack('<H', psm_bytes)[0]
148            await self.connect_l2cap(psm)
149        elif self.tx_characteristic:
150            # Subscribe to TX
151            await self.peer.subscribe(self.tx_characteristic, self.on_tx_received)
152            print(color('=== Subscribed to Gattlink TX', 'yellow'))
153        else:
154            print(color('!!! No Gattlink TX or PSM found', 'red'))
155
156    def on_connection_failure(self, error):
157        print(color(f'!!! Connection failed: {error}'))
158
159    def on_disconnection(self, reason):
160        print(
161            color(
162                f'!!! Disconnected from {self.peer}, '
163                f'reason={HCI_Constant.error_name(reason)}',
164                'red',
165            )
166        )
167        self.tx_characteristic = None
168        self.rx_characteristic = None
169        self.peer = None
170
171    # Called when an L2CAP packet has been received
172    def on_l2cap_packet(self, packet):
173        print(color(f'<<< [L2CAP PACKET]: {len(packet)} bytes', 'cyan'))
174        print(color('>>> [UDP]', 'magenta'))
175        self.tx_socket.sendto(packet)
176
177    # Called by the GATT client when a notification is received
178    def on_tx_received(self, value):
179        print(color(f'<<< [GATT TX]: {len(value)} bytes', 'cyan'))
180        if self.tx_socket:
181            print(color('>>> [UDP]', 'magenta'))
182            self.tx_socket.sendto(value)
183
184    # Called by asyncio when the UDP socket is created
185    def on_l2cap_psm_received(self, value):
186        psm = struct.unpack('<H', value)[0]
187        asyncio.create_task(self.connect_l2cap(psm))
188
189    # Called by asyncio when the UDP socket is created
190    def connection_made(self, transport):
191        pass
192
193    # Called by asyncio when a UDP datagram is received
194    def datagram_received(self, data, _address):
195        print(color(f'<<< [UDP]: {len(data)} bytes', 'green'))
196
197        if self.l2cap_channel:
198            print(color('>>> [L2CAP]', 'yellow'))
199            self.l2cap_channel.write(bytes([len(data) - 1]) + data)
200        elif self.peer and self.rx_characteristic:
201            print(color('>>> [GATT RX]', 'yellow'))
202            asyncio.create_task(self.peer.write_value(self.rx_characteristic, data))
203
204
205# -----------------------------------------------------------------------------
206class GattlinkNodeBridge(GattlinkL2capEndpoint, Device.Listener):
207    def __init__(self, device):
208        super().__init__()
209        self.device = device
210        self.peer = None
211        self.tx_socket = None
212        self.tx_subscriber = None
213        self.rx_characteristic = None
214        self.transport = None
215
216        # Register as a listener
217        device.listener = self
218
219        # Listen for incoming L2CAP CoC connections
220        psm = 0xFB
221        device.register_l2cap_channel_server(0xFB, self.on_coc)
222        print(f'### Listening for CoC connection on PSM {psm}')
223
224        # Setup the Gattlink service
225        self.rx_characteristic = Characteristic(
226            GG_GATTLINK_RX_CHARACTERISTIC_UUID,
227            Characteristic.WRITE_WITHOUT_RESPONSE,
228            Characteristic.WRITEABLE,
229            CharacteristicValue(write=self.on_rx_write),
230        )
231        self.tx_characteristic = Characteristic(
232            GG_GATTLINK_TX_CHARACTERISTIC_UUID,
233            Characteristic.NOTIFY,
234            Characteristic.READABLE,
235        )
236        self.tx_characteristic.on('subscription', self.on_tx_subscription)
237        self.psm_characteristic = Characteristic(
238            GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID,
239            Characteristic.READ | Characteristic.NOTIFY,
240            Characteristic.READABLE,
241            bytes([psm, 0]),
242        )
243        gattlink_service = Service(
244            GG_GATTLINK_SERVICE_UUID,
245            [self.rx_characteristic, self.tx_characteristic, self.psm_characteristic],
246        )
247        device.add_services([gattlink_service])
248        device.advertising_data = bytes(
249            AdvertisingData(
250                [
251                    (AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble GG', 'utf-8')),
252                    (
253                        AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
254                        bytes(
255                            reversed(bytes.fromhex('ABBAFF00E56A484CB8328B17CF6CBFE8'))
256                        ),
257                    ),
258                ]
259            )
260        )
261
262    async def start(self):
263        await self.device.start_advertising()
264
265    # Called by asyncio when the UDP socket is created
266    def connection_made(self, transport):
267        self.transport = transport
268
269    # Called by asyncio when a UDP datagram is received
270    def datagram_received(self, data, _address):
271        print(color(f'<<< [UDP]: {len(data)} bytes', 'green'))
272
273        if self.l2cap_channel:
274            print(color('>>> [L2CAP]', 'yellow'))
275            self.l2cap_channel.write(bytes([len(data) - 1]) + data)
276        elif self.tx_subscriber:
277            print(color('>>> [GATT TX]', 'yellow'))
278            self.tx_characteristic.value = data
279            asyncio.create_task(self.device.notify_subscribers(self.tx_characteristic))
280
281    # Called when a write to the RX characteristic has been received
282    def on_rx_write(self, _connection, data):
283        print(color(f'<<< [GATT RX]: {len(data)} bytes', 'cyan'))
284        print(color('>>> [UDP]', 'magenta'))
285        self.tx_socket.sendto(data)
286
287    # Called when the subscription to the TX characteristic has changed
288    def on_tx_subscription(self, peer, enabled):
289        print(
290            f'### [GATT TX] subscription from {peer}: '
291            f'{"enabled" if enabled else "disabled"}'
292        )
293        if enabled:
294            self.tx_subscriber = peer
295        else:
296            self.tx_subscriber = None
297
298    # Called when an L2CAP packet is received
299    def on_l2cap_packet(self, packet):
300        print(color(f'<<< [L2CAP PACKET]: {len(packet)} bytes', 'cyan'))
301        print(color('>>> [UDP]', 'magenta'))
302        self.tx_socket.sendto(packet)
303
304    # Called when a new connection is established
305    def on_coc(self, channel):
306        print('*** CoC Connection', channel)
307        self.l2cap_channel = channel
308        channel.sink = self.on_coc_sdu
309
310
311# -----------------------------------------------------------------------------
312async def run(
313    hci_transport,
314    device_address,
315    role_or_peer_address,
316    send_host,
317    send_port,
318    receive_host,
319    receive_port,
320):
321    print('<<< connecting to HCI...')
322    async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
323        print('<<< connected')
324
325        # Instantiate a bridge object
326        device = Device.with_hci('Bumble GG', device_address, hci_source, hci_sink)
327
328        # Instantiate a bridge object
329        if role_or_peer_address == 'node':
330            bridge = GattlinkNodeBridge(device)
331        else:
332            bridge = GattlinkHubBridge(device, role_or_peer_address)
333
334        # Create a UDP to RX bridge (receive from UDP, send to RX)
335        loop = asyncio.get_running_loop()
336        await loop.create_datagram_endpoint(
337            lambda: bridge, local_addr=(receive_host, receive_port)
338        )
339
340        # Create a UDP to TX bridge (receive from TX, send to UDP)
341        bridge.tx_socket, _ = await loop.create_datagram_endpoint(
342            # pylint: disable-next=unnecessary-lambda
343            lambda: asyncio.DatagramProtocol(),
344            remote_addr=(send_host, send_port),
345        )
346
347        await device.power_on()
348        await bridge.start()
349
350        # Wait until the source terminates
351        await hci_source.wait_for_termination()
352
353
354@click.command()
355@click.argument('hci_transport')
356@click.argument('device_address')
357@click.argument('role_or_peer_address')
358@click.option(
359    '-sh', '--send-host', type=str, default='127.0.0.1', help='UDP host to send to'
360)
361@click.option('-sp', '--send-port', type=int, default=9001, help='UDP port to send to')
362@click.option(
363    '-rh',
364    '--receive-host',
365    type=str,
366    default='127.0.0.1',
367    help='UDP host to receive on',
368)
369@click.option(
370    '-rp', '--receive-port', type=int, default=9000, help='UDP port to receive on'
371)
372def main(
373    hci_transport,
374    device_address,
375    role_or_peer_address,
376    send_host,
377    send_port,
378    receive_host,
379    receive_port,
380):
381    asyncio.run(
382        run(
383            hci_transport,
384            device_address,
385            role_or_peer_address,
386            send_host,
387            send_port,
388            receive_host,
389            receive_port,
390        )
391    )
392
393
394# -----------------------------------------------------------------------------
395logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
396if __name__ == '__main__':
397    main()
398