• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import sys
20import os
21import logging
22
23from bumble.colors import color
24
25import bumble.core
26from bumble.device import Device
27from bumble.transport import open_transport_or_link
28from bumble.core import (
29    BT_HANDSFREE_SERVICE,
30    BT_RFCOMM_PROTOCOL_ID,
31    BT_BR_EDR_TRANSPORT,
32)
33from bumble.rfcomm import Client
34from bumble.sdp import (
35    Client as SDP_Client,
36    DataElement,
37    ServiceAttribute,
38    SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
39    SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
40    SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
41)
42from bumble.hfp import HfpProtocol
43
44
45# -----------------------------------------------------------------------------
46# pylint: disable-next=too-many-nested-blocks
47async def list_rfcomm_channels(device, connection):
48    # Connect to the SDP Server
49    sdp_client = SDP_Client(device)
50    await sdp_client.connect(connection)
51
52    # Search for services that support the Handsfree Profile
53    search_result = await sdp_client.search_attributes(
54        [BT_HANDSFREE_SERVICE],
55        [
56            SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
57            SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
58            SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
59        ],
60    )
61    print(color('==================================', 'blue'))
62    print(color('Handsfree Services:', 'yellow'))
63    rfcomm_channels = []
64    # pylint: disable-next=too-many-nested-blocks
65    for attribute_list in search_result:
66        # Look for the RFCOMM Channel number
67        protocol_descriptor_list = ServiceAttribute.find_attribute_in_list(
68            attribute_list, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID
69        )
70        if protocol_descriptor_list:
71            for protocol_descriptor in protocol_descriptor_list.value:
72                if len(protocol_descriptor.value) >= 2:
73                    if protocol_descriptor.value[0].value == BT_RFCOMM_PROTOCOL_ID:
74                        print(color('SERVICE:', 'green'))
75                        print(
76                            color('  RFCOMM Channel:', 'cyan'),
77                            protocol_descriptor.value[1].value,
78                        )
79                        rfcomm_channels.append(protocol_descriptor.value[1].value)
80
81                        # List profiles
82                        bluetooth_profile_descriptor_list = (
83                            ServiceAttribute.find_attribute_in_list(
84                                attribute_list,
85                                SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
86                            )
87                        )
88                        if bluetooth_profile_descriptor_list:
89                            if bluetooth_profile_descriptor_list.value:
90                                if (
91                                    bluetooth_profile_descriptor_list.value[0].type
92                                    == DataElement.SEQUENCE
93                                ):
94                                    bluetooth_profile_descriptors = (
95                                        bluetooth_profile_descriptor_list.value
96                                    )
97                                else:
98                                    # Sometimes, instead of a list of lists, we just
99                                    # find a list. Fix that
100                                    bluetooth_profile_descriptors = [
101                                        bluetooth_profile_descriptor_list
102                                    ]
103
104                                print(color('  Profiles:', 'green'))
105                                for (
106                                    bluetooth_profile_descriptor
107                                ) in bluetooth_profile_descriptors:
108                                    version_major = (
109                                        bluetooth_profile_descriptor.value[1].value >> 8
110                                    )
111                                    version_minor = (
112                                        bluetooth_profile_descriptor.value[1].value
113                                        & 0xFF
114                                    )
115                                    print(
116                                        '    '
117                                        f'{bluetooth_profile_descriptor.value[0].value}'
118                                        f' - version {version_major}.{version_minor}'
119                                    )
120
121                        # List service classes
122                        service_class_id_list = ServiceAttribute.find_attribute_in_list(
123                            attribute_list, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID
124                        )
125                        if service_class_id_list:
126                            if service_class_id_list.value:
127                                print(color('  Service Classes:', 'green'))
128                                for service_class_id in service_class_id_list.value:
129                                    print('   ', service_class_id.value)
130
131    await sdp_client.disconnect()
132    return rfcomm_channels
133
134
135# -----------------------------------------------------------------------------
136async def main():
137    if len(sys.argv) < 4:
138        print(
139            'Usage: run_hfp_gateway.py <device-config> <transport-spec> '
140            '<bluetooth-address>'
141        )
142        print(
143            '  specifying a channel number, or "discover" to list all RFCOMM channels'
144        )
145        print('example: run_hfp_gateway.py hfp_gateway.json usb:0 E1:CA:72:48:C4:E8')
146        return
147
148    print('<<< connecting to HCI...')
149    async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
150        print('<<< connected')
151
152        # Create a device
153        device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
154        device.classic_enabled = True
155        await device.power_on()
156
157        # Connect to a peer
158        target_address = sys.argv[3]
159        print(f'=== Connecting to {target_address}...')
160        connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
161        print(f'=== Connected to {connection.peer_address}!')
162
163        # Get a list of all the Handsfree services (should only be 1)
164        channels = await list_rfcomm_channels(device, connection)
165        if len(channels) == 0:
166            print('!!! no service found')
167            return
168
169        # Pick the first one
170        channel = channels[0]
171
172        # Request authentication
173        print('*** Authenticating...')
174        await connection.authenticate()
175        print('*** Authenticated')
176
177        # Enable encryption
178        print('*** Enabling encryption...')
179        await connection.encrypt()
180        print('*** Encryption on')
181
182        # Create a client and start it
183        print('@@@ Starting to RFCOMM client...')
184        rfcomm_client = Client(device, connection)
185        rfcomm_mux = await rfcomm_client.start()
186        print('@@@ Started')
187
188        print(f'### Opening session for channel {channel}...')
189        try:
190            session = await rfcomm_mux.open_dlc(channel)
191            print('### Session open', session)
192        except bumble.core.ConnectionError as error:
193            print(f'### Session open failed: {error}')
194            await rfcomm_mux.disconnect()
195            print('@@@ Disconnected from RFCOMM server')
196            return
197
198        # Protocol loop (just for testing at this point)
199        protocol = HfpProtocol(session)
200        while True:
201            line = await protocol.next_line()
202
203            if line.startswith('AT+BRSF='):
204                protocol.send_response_line('+BRSF: 30')
205                protocol.send_response_line('OK')
206            elif line.startswith('AT+CIND=?'):
207                protocol.send_response_line(
208                    '+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
209                    '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
210                    '("callheld",(0-2))'
211                )
212                protocol.send_response_line('OK')
213            elif line.startswith('AT+CIND?'):
214                protocol.send_response_line('+CIND: 0,0,1,4,1,5,0')
215                protocol.send_response_line('OK')
216            elif line.startswith('AT+CMER='):
217                protocol.send_response_line('OK')
218            elif line.startswith('AT+CHLD=?'):
219                protocol.send_response_line('+CHLD: 0')
220                protocol.send_response_line('OK')
221            elif line.startswith('AT+BTRH?'):
222                protocol.send_response_line('+BTRH: 0')
223                protocol.send_response_line('OK')
224            elif line.startswith('AT+CLIP='):
225                protocol.send_response_line('OK')
226            elif line.startswith('AT+VGS='):
227                protocol.send_response_line('OK')
228            elif line.startswith('AT+BIA='):
229                protocol.send_response_line('OK')
230            elif line.startswith('AT+BVRA='):
231                protocol.send_response_line(
232                    '+BVRA: 1,1,12AA,1,1,"Message 1 from Janina"'
233                )
234            elif line.startswith('AT+XEVENT='):
235                protocol.send_response_line('OK')
236            elif line.startswith('AT+XAPL='):
237                protocol.send_response_line('OK')
238            else:
239                print(color('UNSUPPORTED AT COMMAND', 'red'))
240                protocol.send_response_line('ERROR')
241
242        await hci_source.wait_for_termination()
243
244
245# -----------------------------------------------------------------------------
246logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
247asyncio.run(main())
248