• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import sys
20import os
21import logging
22
23from colors import color
24from bumble.device import Device
25from bumble.transport import open_transport_or_link
26from bumble.core import (
27    BT_BR_EDR_TRANSPORT,
28    BT_AVDTP_PROTOCOL_ID,
29    BT_AUDIO_SINK_SERVICE,
30    BT_L2CAP_PROTOCOL_ID
31)
32from bumble.avdtp import (
33    Protocol as AVDTP_Protocol,
34    find_avdtp_service_with_connection
35)
36from bumble.a2dp import make_audio_source_service_sdp_records
37from bumble.sdp import (
38    Client as SDP_Client,
39    ServiceAttribute,
40    DataElement,
41    SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
42    SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
43    SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID
44)
45
46
47# -----------------------------------------------------------------------------
48def sdp_records():
49    service_record_handle = 0x00010001
50    return {
51        service_record_handle: make_audio_source_service_sdp_records(service_record_handle)
52    }
53
54
55# -----------------------------------------------------------------------------
56async def find_a2dp_service(device, connection):
57    # Connect to the SDP Server
58    sdp_client = SDP_Client(device)
59    await sdp_client.connect(connection)
60
61    # Search for services with an Audio Sink service class
62    search_result = await sdp_client.search_attributes(
63        [BT_AUDIO_SINK_SERVICE],
64        [
65            SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
66            SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
67            SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID
68        ]
69    )
70
71    print(color('==================================', 'blue'))
72    print(color('A2DP Sink Services:', 'yellow'))
73
74    service_version = None
75
76    for attribute_list in search_result:
77        print(color('SERVICE:', 'green'))
78
79        # Service classes
80        service_class_id_list = ServiceAttribute.find_attribute_in_list(
81            attribute_list,
82            SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID
83        )
84        if service_class_id_list:
85            if service_class_id_list.value:
86                print(color('  Service Classes:', 'green'))
87                for service_class_id in service_class_id_list.value:
88                    print('   ', service_class_id.value)
89
90        # Protocol info
91        protocol_descriptor_list = ServiceAttribute.find_attribute_in_list(
92            attribute_list,
93            SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID
94        )
95        if protocol_descriptor_list:
96            print(color('  Protocol:', 'green'))
97            for protocol_descriptor in protocol_descriptor_list.value:
98                if protocol_descriptor.value[0].value == BT_L2CAP_PROTOCOL_ID:
99                    if len(protocol_descriptor.value) >= 2:
100                        psm = protocol_descriptor.value[1].value
101                        print(f'{color("    L2CAP PSM:", "cyan")}     {psm}')
102                elif protocol_descriptor.value[0].value == BT_AVDTP_PROTOCOL_ID:
103                    if len(protocol_descriptor.value) >= 2:
104                        avdtp_version_major = protocol_descriptor.value[1].value >> 8
105                        avdtp_version_minor = protocol_descriptor.value[1].value & 0xFF
106                        print(f'{color("    AVDTP Version:", "cyan")} {avdtp_version_major}.{avdtp_version_minor}')
107                        service_version = (avdtp_version_major, avdtp_version_minor)
108
109        # Profile info
110        bluetooth_profile_descriptor_list = ServiceAttribute.find_attribute_in_list(
111            attribute_list,
112            SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
113        )
114        if bluetooth_profile_descriptor_list:
115            if bluetooth_profile_descriptor_list.value:
116                if bluetooth_profile_descriptor_list.value[0].type == DataElement.SEQUENCE:
117                    bluetooth_profile_descriptors = bluetooth_profile_descriptor_list.value
118                else:
119                    # Sometimes, instead of a list of lists, we just find a list. Fix that
120                    bluetooth_profile_descriptors = [bluetooth_profile_descriptor_list]
121
122                print(color('  Profiles:', 'green'))
123                for bluetooth_profile_descriptor in bluetooth_profile_descriptors:
124                    version_major = bluetooth_profile_descriptor.value[1].value >> 8
125                    version_minor = bluetooth_profile_descriptor.value[1].value & 0xFF
126                    print(f'    {bluetooth_profile_descriptor.value[0].value} - version {version_major}.{version_minor}')
127
128    await sdp_client.disconnect()
129    return service_version
130
131
132# -----------------------------------------------------------------------------
133async def main():
134    if len(sys.argv) < 4:
135        print('Usage: run_a2dp_info.py <device-config> <transport-spec> <bt-addr>')
136        print('example: run_a2dp_info.py classic1.json usb:0 14:7D:DA:4E:53:A8')
137        return
138
139    print('<<< connecting to HCI...')
140    async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
141        print('<<< connected')
142
143        # Create a device
144        device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
145        device.classic_enabled = True
146
147        # Start the controller
148        await device.power_on()
149
150        # Setup the SDP to expose a SRC service, in case the remote device queries us back
151        device.sdp_service_records = sdp_records()
152
153        # Connect to a peer
154        target_address = sys.argv[3]
155        print(f'=== Connecting to {target_address}...')
156        connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
157        print(f'=== Connected to {connection.peer_address}!')
158
159        # Request authentication
160        print('*** Authenticating...')
161        await connection.authenticate()
162        print('*** Authenticated')
163
164        # Enable encryption
165        print('*** Enabling encryption...')
166        await connection.encrypt()
167        print('*** Encryption on')
168
169        # Look for an A2DP service
170        avdtp_version = await find_a2dp_service(device, connection)
171        if not avdtp_version:
172            print(color('!!! no AVDTP service found'))
173            return
174        print(f'AVDTP version: {avdtp_version[0]}.{avdtp_version[1]}')
175
176        # Create a client to interact with the remote device
177        client = await AVDTP_Protocol.connect(connection, avdtp_version)
178
179        # Discover all endpoints on the remote device
180        endpoints = await client.discover_remote_endpoints()
181        print(f'@@@ Found {len(endpoints)} endpoints')
182        for endpoint in endpoints:
183            print('@@@', endpoint)
184
185
186# -----------------------------------------------------------------------------
187logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
188asyncio.run(main())
189