• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import sys
20import os
21import logging
22
23from bumble.device import Device
24from bumble.transport import open_transport_or_link
25from bumble.core import BT_L2CAP_PROTOCOL_ID, BT_RFCOMM_PROTOCOL_ID, UUID
26from bumble.rfcomm import Server
27from bumble.sdp import (
28    DataElement,
29    ServiceAttribute,
30    SDP_PUBLIC_BROWSE_ROOT,
31    SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
32    SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
33    SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
34    SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
35)
36
37
38# -----------------------------------------------------------------------------
39def sdp_records(channel):
40    return {
41        0x00010001: [
42            ServiceAttribute(
43                SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
44                DataElement.unsigned_integer_32(0x00010001),
45            ),
46            ServiceAttribute(
47                SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
48                DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]),
49            ),
50            ServiceAttribute(
51                SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
52                DataElement.sequence(
53                    [DataElement.uuid(UUID('E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'))]
54                ),
55            ),
56            ServiceAttribute(
57                SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
58                DataElement.sequence(
59                    [
60                        DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]),
61                        DataElement.sequence(
62                            [
63                                DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
64                                DataElement.unsigned_integer_8(channel),
65                            ]
66                        ),
67                    ]
68                ),
69            ),
70        ]
71    }
72
73
74# -----------------------------------------------------------------------------
75def on_dlc(dlc):
76    print('*** DLC connected', dlc)
77    dlc.sink = lambda data: on_rfcomm_data_received(dlc, data)
78
79
80# -----------------------------------------------------------------------------
81def on_rfcomm_data_received(dlc, data):
82    print(f'<<< Data received: {data.hex()}')
83    try:
84        message = data.decode('utf-8')
85        print(f'<<< Message = {message}')
86    except Exception:
87        pass
88
89    # Echo everything back
90    dlc.write(data)
91
92
93# -----------------------------------------------------------------------------
94async def main():
95    if len(sys.argv) < 3:
96        print('Usage: run_rfcomm_server.py <device-config> <transport-spec>')
97        print('example: run_rfcomm_server.py classic2.json usb:04b4:f901')
98        return
99
100    print('<<< connecting to HCI...')
101    async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
102        print('<<< connected')
103
104        # Create a device
105        device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
106        device.classic_enabled = True
107
108        # Create and register a server
109        rfcomm_server = Server(device)
110
111        # Listen for incoming DLC connections
112        channel_number = rfcomm_server.listen(on_dlc)
113        print(f'### Listening for connection on channel {channel_number}')
114
115        # Setup the SDP to advertise this channel
116        device.sdp_service_records = sdp_records(channel_number)
117
118        # Start the controller
119        await device.power_on()
120
121        # Start being discoverable and connectable
122        await device.set_discoverable(True)
123        await device.set_connectable(True)
124
125        await hci_source.wait_for_termination()
126
127
128# -----------------------------------------------------------------------------
129logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
130asyncio.run(main())
131