1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import sys 20import os 21import logging 22 23from bumble.colors import color 24 25import bumble.core 26from bumble.device import Device 27from bumble.transport import open_transport_or_link 28from bumble.core import ( 29 BT_L2CAP_PROTOCOL_ID, 30 BT_RFCOMM_PROTOCOL_ID, 31 BT_BR_EDR_TRANSPORT, 32) 33from bumble.rfcomm import Client 34from bumble.sdp import ( 35 Client as SDP_Client, 36 DataElement, 37 ServiceAttribute, 38 SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 39 SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, 40 SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 41) 42 43 44# ----------------------------------------------------------------------------- 45async def list_rfcomm_channels(device, connection): 46 # Connect to the SDP Server 47 sdp_client = SDP_Client(device) 48 await sdp_client.connect(connection) 49 50 # Search for services with an L2CAP service attribute 51 search_result = await sdp_client.search_attributes( 52 [BT_L2CAP_PROTOCOL_ID], 53 [ 54 SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 55 SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 56 SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, 57 ], 58 ) 59 print(color('==================================', 'blue')) 60 print(color('RFCOMM Services:', 'yellow')) 61 # pylint: disable-next=too-many-nested-blocks 62 for attribute_list in search_result: 63 # Look for the RFCOMM Channel number 64 protocol_descriptor_list = ServiceAttribute.find_attribute_in_list( 65 attribute_list, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID 66 ) 67 if protocol_descriptor_list: 68 for protocol_descriptor in protocol_descriptor_list.value: 69 if len(protocol_descriptor.value) >= 2: 70 if protocol_descriptor.value[0].value == BT_RFCOMM_PROTOCOL_ID: 71 print(color('SERVICE:', 'green')) 72 print( 73 color(' RFCOMM Channel:', 'cyan'), 74 protocol_descriptor.value[1].value, 75 ) 76 77 # List profiles 78 bluetooth_profile_descriptor_list = ( 79 ServiceAttribute.find_attribute_in_list( 80 attribute_list, 81 SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 82 ) 83 ) 84 if bluetooth_profile_descriptor_list: 85 if bluetooth_profile_descriptor_list.value: 86 if ( 87 bluetooth_profile_descriptor_list.value[0].type 88 == DataElement.SEQUENCE 89 ): 90 bluetooth_profile_descriptors = ( 91 bluetooth_profile_descriptor_list.value 92 ) 93 else: 94 # Sometimes, instead of a list of lists, we just 95 # find a list. Fix that 96 bluetooth_profile_descriptors = [ 97 bluetooth_profile_descriptor_list 98 ] 99 100 print(color(' Profiles:', 'green')) 101 for ( 102 bluetooth_profile_descriptor 103 ) in bluetooth_profile_descriptors: 104 version_major = ( 105 bluetooth_profile_descriptor.value[1].value >> 8 106 ) 107 version_minor = ( 108 bluetooth_profile_descriptor.value[1].value 109 & 0xFF 110 ) 111 print( 112 ' ' 113 f'{bluetooth_profile_descriptor.value[0].value}' 114 f' - version {version_major}.{version_minor}' 115 ) 116 117 # List service classes 118 service_class_id_list = ServiceAttribute.find_attribute_in_list( 119 attribute_list, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID 120 ) 121 if service_class_id_list: 122 if service_class_id_list.value: 123 print(color(' Service Classes:', 'green')) 124 for service_class_id in service_class_id_list.value: 125 print(' ', service_class_id.value) 126 127 await sdp_client.disconnect() 128 129 130# ----------------------------------------------------------------------------- 131class TcpServerProtocol(asyncio.Protocol): 132 def __init__(self, rfcomm_session): 133 self.rfcomm_session = rfcomm_session 134 self.transport = None 135 136 def connection_made(self, transport): 137 peer_name = transport.get_extra_info('peer_name') 138 print(f'<<< TCP Server: connection from {peer_name}') 139 self.transport = transport 140 self.rfcomm_session.sink = self.rfcomm_data_received 141 142 def rfcomm_data_received(self, data): 143 print(f'<<< RFCOMM Data: {data.hex()}') 144 if self.transport: 145 self.transport.write(data) 146 else: 147 print('!!! no TCP connection, dropping data') 148 149 def data_received(self, data): 150 print(f'<<< TCP Server: data received: {len(data)} bytes - {data.hex()}') 151 self.rfcomm_session.write(data) 152 153 154# ----------------------------------------------------------------------------- 155async def tcp_server(tcp_port, rfcomm_session): 156 print(f'$$$ Starting TCP server on port {tcp_port}') 157 158 server = await asyncio.get_running_loop().create_server( 159 lambda: TcpServerProtocol(rfcomm_session), '127.0.0.1', tcp_port 160 ) 161 await asyncio.get_running_loop().create_future() 162 163 async with server: 164 await server.serve_forever() 165 166 167# ----------------------------------------------------------------------------- 168async def main(): 169 if len(sys.argv) < 5: 170 print( 171 'Usage: run_rfcomm_client.py <device-config> <transport-spec> ' 172 '<bluetooth-address> <channel>|discover [tcp-port]' 173 ) 174 print( 175 ' specifying a channel number, or "discover" to list all RFCOMM channels' 176 ) 177 print('example: run_rfcomm_client.py classic1.json usb:0 E1:CA:72:48:C4:E8 8') 178 return 179 180 print('<<< connecting to HCI...') 181 async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): 182 print('<<< connected') 183 184 # Create a device 185 device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) 186 device.classic_enabled = True 187 await device.power_on() 188 189 # Connect to a peer 190 target_address = sys.argv[3] 191 print(f'=== Connecting to {target_address}...') 192 connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) 193 print(f'=== Connected to {connection.peer_address}!') 194 195 channel = sys.argv[4] 196 if channel == 'discover': 197 await list_rfcomm_channels(device, connection) 198 return 199 200 # Request authentication 201 print('*** Authenticating...') 202 await connection.authenticate() 203 print('*** Authenticated') 204 205 # Enable encryption 206 print('*** Enabling encryption...') 207 await connection.encrypt() 208 print('*** Encryption on') 209 210 # Create a client and start it 211 print('@@@ Starting RFCOMM client...') 212 rfcomm_client = Client(device, connection) 213 rfcomm_mux = await rfcomm_client.start() 214 print('@@@ Started') 215 216 channel = int(channel) 217 print(f'### Opening session for channel {channel}...') 218 try: 219 session = await rfcomm_mux.open_dlc(channel) 220 print('### Session open', session) 221 except bumble.core.ConnectionError as error: 222 print(f'### Session open failed: {error}') 223 await rfcomm_mux.disconnect() 224 print('@@@ Disconnected from RFCOMM server') 225 return 226 227 if len(sys.argv) == 6: 228 # A TCP port was specified, start listening 229 tcp_port = int(sys.argv[5]) 230 asyncio.create_task(tcp_server(tcp_port, session)) 231 232 await hci_source.wait_for_termination() 233 234 235# ----------------------------------------------------------------------------- 236logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) 237asyncio.run(main()) 238