1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import sys 20import os 21import logging 22 23from colors import color 24from bumble.device import Device 25from bumble.transport import open_transport_or_link 26from bumble.core import ( 27 BT_BR_EDR_TRANSPORT, 28 BT_AVDTP_PROTOCOL_ID, 29 BT_AUDIO_SINK_SERVICE, 30 BT_L2CAP_PROTOCOL_ID 31) 32from bumble.avdtp import ( 33 Protocol as AVDTP_Protocol, 34 find_avdtp_service_with_connection 35) 36from bumble.a2dp import make_audio_source_service_sdp_records 37from bumble.sdp import ( 38 Client as SDP_Client, 39 ServiceAttribute, 40 DataElement, 41 SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 42 SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 43 SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID 44) 45 46 47# ----------------------------------------------------------------------------- 48def sdp_records(): 49 service_record_handle = 0x00010001 50 return { 51 service_record_handle: make_audio_source_service_sdp_records(service_record_handle) 52 } 53 54 55# ----------------------------------------------------------------------------- 56async def find_a2dp_service(device, connection): 57 # Connect to the SDP Server 58 sdp_client = SDP_Client(device) 59 await sdp_client.connect(connection) 60 61 # Search for services with an Audio Sink service class 62 search_result = await sdp_client.search_attributes( 63 [BT_AUDIO_SINK_SERVICE], 64 [ 65 SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 66 SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 67 SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID 68 ] 69 ) 70 71 print(color('==================================', 'blue')) 72 print(color('A2DP Sink Services:', 'yellow')) 73 74 service_version = None 75 76 for attribute_list in search_result: 77 print(color('SERVICE:', 'green')) 78 79 # Service classes 80 service_class_id_list = ServiceAttribute.find_attribute_in_list( 81 attribute_list, 82 SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID 83 ) 84 if service_class_id_list: 85 if service_class_id_list.value: 86 print(color(' Service Classes:', 'green')) 87 for service_class_id in service_class_id_list.value: 88 print(' ', service_class_id.value) 89 90 # Protocol info 91 protocol_descriptor_list = ServiceAttribute.find_attribute_in_list( 92 attribute_list, 93 SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID 94 ) 95 if protocol_descriptor_list: 96 print(color(' Protocol:', 'green')) 97 for protocol_descriptor in protocol_descriptor_list.value: 98 if protocol_descriptor.value[0].value == BT_L2CAP_PROTOCOL_ID: 99 if len(protocol_descriptor.value) >= 2: 100 psm = protocol_descriptor.value[1].value 101 print(f'{color(" L2CAP PSM:", "cyan")} {psm}') 102 elif protocol_descriptor.value[0].value == BT_AVDTP_PROTOCOL_ID: 103 if len(protocol_descriptor.value) >= 2: 104 avdtp_version_major = protocol_descriptor.value[1].value >> 8 105 avdtp_version_minor = protocol_descriptor.value[1].value & 0xFF 106 print(f'{color(" AVDTP Version:", "cyan")} {avdtp_version_major}.{avdtp_version_minor}') 107 service_version = (avdtp_version_major, avdtp_version_minor) 108 109 # Profile info 110 bluetooth_profile_descriptor_list = ServiceAttribute.find_attribute_in_list( 111 attribute_list, 112 SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID 113 ) 114 if bluetooth_profile_descriptor_list: 115 if bluetooth_profile_descriptor_list.value: 116 if bluetooth_profile_descriptor_list.value[0].type == DataElement.SEQUENCE: 117 bluetooth_profile_descriptors = bluetooth_profile_descriptor_list.value 118 else: 119 # Sometimes, instead of a list of lists, we just find a list. Fix that 120 bluetooth_profile_descriptors = [bluetooth_profile_descriptor_list] 121 122 print(color(' Profiles:', 'green')) 123 for bluetooth_profile_descriptor in bluetooth_profile_descriptors: 124 version_major = bluetooth_profile_descriptor.value[1].value >> 8 125 version_minor = bluetooth_profile_descriptor.value[1].value & 0xFF 126 print(f' {bluetooth_profile_descriptor.value[0].value} - version {version_major}.{version_minor}') 127 128 await sdp_client.disconnect() 129 return service_version 130 131 132# ----------------------------------------------------------------------------- 133async def main(): 134 if len(sys.argv) < 4: 135 print('Usage: run_a2dp_info.py <device-config> <transport-spec> <bt-addr>') 136 print('example: run_a2dp_info.py classic1.json usb:0 14:7D:DA:4E:53:A8') 137 return 138 139 print('<<< connecting to HCI...') 140 async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): 141 print('<<< connected') 142 143 # Create a device 144 device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) 145 device.classic_enabled = True 146 147 # Start the controller 148 await device.power_on() 149 150 # Setup the SDP to expose a SRC service, in case the remote device queries us back 151 device.sdp_service_records = sdp_records() 152 153 # Connect to a peer 154 target_address = sys.argv[3] 155 print(f'=== Connecting to {target_address}...') 156 connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) 157 print(f'=== Connected to {connection.peer_address}!') 158 159 # Request authentication 160 print('*** Authenticating...') 161 await connection.authenticate() 162 print('*** Authenticated') 163 164 # Enable encryption 165 print('*** Enabling encryption...') 166 await connection.encrypt() 167 print('*** Encryption on') 168 169 # Look for an A2DP service 170 avdtp_version = await find_a2dp_service(device, connection) 171 if not avdtp_version: 172 print(color('!!! no AVDTP service found')) 173 return 174 print(f'AVDTP version: {avdtp_version[0]}.{avdtp_version[1]}') 175 176 # Create a client to interact with the remote device 177 client = await AVDTP_Protocol.connect(connection, avdtp_version) 178 179 # Discover all endpoints on the remote device 180 endpoints = await client.discover_remote_endpoints() 181 print(f'@@@ Found {len(endpoints)} endpoints') 182 for endpoint in endpoints: 183 print('@@@', endpoint) 184 185 186# ----------------------------------------------------------------------------- 187logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) 188asyncio.run(main()) 189