1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import logging 20import threading 21import collections 22import ctypes 23import platform 24 25import usb1 26 27from bumble.transport.common import Transport, ParserSource 28from bumble import hci 29from bumble.colors import color 30from bumble.utils import AsyncRunner 31 32 33# ----------------------------------------------------------------------------- 34# Logging 35# ----------------------------------------------------------------------------- 36logger = logging.getLogger(__name__) 37 38 39# ----------------------------------------------------------------------------- 40def load_libusb(): 41 ''' 42 Attempt to load the libusb-1.0 C library from libusb_package in site-packages. 43 If the library exists, we create a DLL object and initialize the usb1 backend. 44 This only needs to be done once, but before a usb1.USBContext is created. 45 If the library does not exists, do nothing and usb1 will search default system paths 46 when usb1.USBContext is created. 47 ''' 48 try: 49 import libusb_package 50 except ImportError: 51 logger.debug('libusb_package is not available') 52 else: 53 if libusb_path := libusb_package.get_library_path(): 54 logger.debug(f'loading libusb library at {libusb_path}') 55 dll_loader = ( 56 ctypes.WinDLL if platform.system() == 'Windows' else ctypes.CDLL 57 ) 58 libusb_dll = dll_loader( 59 str(libusb_path), use_errno=True, use_last_error=True 60 ) 61 usb1.loadLibrary(libusb_dll) 62 63 64async def open_usb_transport(spec: str) -> Transport: 65 ''' 66 Open a USB transport. 67 The moniker string has this syntax: 68 either <index> or 69 <vendor>:<product> or 70 <vendor>:<product>/<serial-number>] or 71 <vendor>:<product>#<index> 72 With <index> as the 0-based index to select amongst all the devices that appear 73 to be supporting Bluetooth HCI (0 being the first one), or 74 Where <vendor> and <product> are the vendor ID and product ID in hexadecimal. The 75 /<serial-number> suffix or #<index> suffix max be specified when more than one 76 device with the same vendor and product identifiers are present. 77 78 In addition, if the moniker ends with the symbol "!", the device will be used in 79 "forced" mode: 80 the first USB interface of the device will be used, regardless of the interface 81 class/subclass. 82 This may be useful for some devices that use a custom class/subclass but may 83 nonetheless work as-is. 84 85 Examples: 86 0 --> the first BT USB dongle 87 04b4:f901 --> the BT USB dongle with vendor=04b4 and product=f901 88 04b4:f901#2 --> the third USB device with vendor=04b4 and product=f901 89 04b4:f901/00E04C239987 --> the BT USB dongle with vendor=04b4 and product=f901 and 90 serial number 00E04C239987 91 usb:0B05:17CB! --> the BT USB dongle vendor=0B05 and product=17CB, in "forced" mode. 92 ''' 93 94 # pylint: disable=invalid-name 95 USB_RECIPIENT_DEVICE = 0x00 96 USB_REQUEST_TYPE_CLASS = 0x01 << 5 97 USB_DEVICE_CLASS_DEVICE = 0x00 98 USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0 99 USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01 100 USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01 101 USB_ENDPOINT_TRANSFER_TYPE_BULK = 0x02 102 USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT = 0x03 103 USB_ENDPOINT_IN = 0x80 104 105 USB_BT_HCI_CLASS_TUPLE = ( 106 USB_DEVICE_CLASS_WIRELESS_CONTROLLER, 107 USB_DEVICE_SUBCLASS_RF_CONTROLLER, 108 USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER, 109 ) 110 111 READ_SIZE = 4096 112 113 class UsbPacketSink: 114 def __init__(self, device, acl_out): 115 self.device = device 116 self.acl_out = acl_out 117 self.acl_out_transfer = device.getTransfer() 118 self.packets = collections.deque() # Queue of packets waiting to be sent 119 self.loop = asyncio.get_running_loop() 120 self.cancel_done = self.loop.create_future() 121 self.closed = False 122 123 def start(self): 124 pass 125 126 def on_packet(self, packet): 127 # Ignore packets if we're closed 128 if self.closed: 129 return 130 131 if len(packet) == 0: 132 logger.warning('packet too short') 133 return 134 135 # Queue the packet 136 self.packets.append(packet) 137 if len(self.packets) == 1: 138 # The queue was previously empty, re-prime the pump 139 self.process_queue() 140 141 def transfer_callback(self, transfer): 142 status = transfer.getStatus() 143 144 # pylint: disable=no-member 145 if status == usb1.TRANSFER_COMPLETED: 146 self.loop.call_soon_threadsafe(self.on_packet_sent) 147 elif status == usb1.TRANSFER_CANCELLED: 148 self.loop.call_soon_threadsafe(self.cancel_done.set_result, None) 149 else: 150 logger.warning( 151 color(f'!!! OUT transfer not completed: status={status}', 'red') 152 ) 153 154 def on_packet_sent(self): 155 if self.packets: 156 self.packets.popleft() 157 self.process_queue() 158 159 def process_queue(self): 160 if len(self.packets) == 0: 161 return # Nothing to do 162 163 packet = self.packets[0] 164 packet_type = packet[0] 165 if packet_type == hci.HCI_ACL_DATA_PACKET: 166 self.acl_out_transfer.setBulk( 167 self.acl_out, packet[1:], callback=self.transfer_callback 168 ) 169 self.acl_out_transfer.submit() 170 elif packet_type == hci.HCI_COMMAND_PACKET: 171 self.acl_out_transfer.setControl( 172 USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS, 173 0, 174 0, 175 0, 176 packet[1:], 177 callback=self.transfer_callback, 178 ) 179 self.acl_out_transfer.submit() 180 else: 181 logger.warning(color(f'unsupported packet type {packet_type}', 'red')) 182 183 def close(self): 184 self.closed = True 185 186 async def terminate(self): 187 if not self.closed: 188 self.close() 189 190 # Empty the packet queue so that we don't send any more data 191 self.packets.clear() 192 193 # If we have a transfer in flight, cancel it 194 if self.acl_out_transfer.isSubmitted(): 195 # Try to cancel the transfer, but that may fail because it may have 196 # already completed 197 try: 198 self.acl_out_transfer.cancel() 199 200 logger.debug('waiting for OUT transfer cancellation to be done...') 201 await self.cancel_done 202 logger.debug('OUT transfer cancellation done') 203 except usb1.USBError: 204 logger.debug('OUT transfer likely already completed') 205 206 class UsbPacketSource(asyncio.Protocol, ParserSource): 207 def __init__(self, device, metadata, acl_in, events_in): 208 super().__init__() 209 self.device = device 210 self.metadata = metadata 211 self.acl_in = acl_in 212 self.acl_in_transfer = None 213 self.events_in = events_in 214 self.events_in_transfer = None 215 self.loop = asyncio.get_running_loop() 216 self.queue = asyncio.Queue() 217 self.dequeue_task = None 218 self.cancel_done = { 219 hci.HCI_EVENT_PACKET: self.loop.create_future(), 220 hci.HCI_ACL_DATA_PACKET: self.loop.create_future(), 221 } 222 self.closed = False 223 224 def start(self): 225 # Set up transfer objects for input 226 self.events_in_transfer = device.getTransfer() 227 self.events_in_transfer.setInterrupt( 228 self.events_in, 229 READ_SIZE, 230 callback=self.transfer_callback, 231 user_data=hci.HCI_EVENT_PACKET, 232 ) 233 self.events_in_transfer.submit() 234 235 self.acl_in_transfer = device.getTransfer() 236 self.acl_in_transfer.setBulk( 237 self.acl_in, 238 READ_SIZE, 239 callback=self.transfer_callback, 240 user_data=hci.HCI_ACL_DATA_PACKET, 241 ) 242 self.acl_in_transfer.submit() 243 244 self.dequeue_task = self.loop.create_task(self.dequeue()) 245 246 @property 247 def usb_transfer_submitted(self): 248 return ( 249 self.events_in_transfer.isSubmitted() 250 or self.acl_in_transfer.isSubmitted() 251 ) 252 253 def transfer_callback(self, transfer): 254 packet_type = transfer.getUserData() 255 status = transfer.getStatus() 256 257 # pylint: disable=no-member 258 if status == usb1.TRANSFER_COMPLETED: 259 packet = ( 260 bytes([packet_type]) 261 + transfer.getBuffer()[: transfer.getActualLength()] 262 ) 263 self.loop.call_soon_threadsafe(self.queue.put_nowait, packet) 264 265 # Re-submit the transfer so we can receive more data 266 transfer.submit() 267 elif status == usb1.TRANSFER_CANCELLED: 268 self.loop.call_soon_threadsafe( 269 self.cancel_done[packet_type].set_result, None 270 ) 271 else: 272 logger.warning( 273 color(f'!!! IN transfer not completed: status={status}', 'red') 274 ) 275 self.loop.call_soon_threadsafe(self.on_transport_lost) 276 277 async def dequeue(self): 278 while not self.closed: 279 try: 280 packet = await self.queue.get() 281 except asyncio.CancelledError: 282 return 283 self.parser.feed_data(packet) 284 285 def close(self): 286 self.closed = True 287 288 async def terminate(self): 289 if not self.closed: 290 self.close() 291 292 self.dequeue_task.cancel() 293 294 # Cancel the transfers 295 for transfer in (self.events_in_transfer, self.acl_in_transfer): 296 if transfer.isSubmitted(): 297 # Try to cancel the transfer, but that may fail because it may have 298 # already completed 299 packet_type = transfer.getUserData() 300 try: 301 transfer.cancel() 302 logger.debug( 303 f'waiting for IN[{packet_type}] transfer cancellation ' 304 'to be done...' 305 ) 306 await self.cancel_done[packet_type] 307 logger.debug(f'IN[{packet_type}] transfer cancellation done') 308 except usb1.USBError: 309 logger.debug( 310 f'IN[{packet_type}] transfer likely already completed' 311 ) 312 313 class UsbTransport(Transport): 314 def __init__(self, context, device, interface, setting, source, sink): 315 super().__init__(source, sink) 316 self.context = context 317 self.device = device 318 self.interface = interface 319 self.loop = asyncio.get_running_loop() 320 self.event_loop_done = self.loop.create_future() 321 322 # Get exclusive access 323 device.claimInterface(interface) 324 325 # Set the alternate setting if not the default 326 if setting != 0: 327 device.setInterfaceAltSetting(interface, setting) 328 329 # The source and sink can now start 330 source.start() 331 sink.start() 332 333 # Create a thread to process events 334 self.event_thread = threading.Thread(target=self.run) 335 self.event_thread.start() 336 337 def run(self): 338 logger.debug('starting USB event loop') 339 while self.source.usb_transfer_submitted: 340 # pylint: disable=no-member 341 try: 342 self.context.handleEvents() 343 except usb1.USBErrorInterrupted: 344 pass 345 346 logger.debug('USB event loop done') 347 self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None) 348 349 async def close(self): 350 self.source.close() 351 self.sink.close() 352 await self.source.terminate() 353 await self.sink.terminate() 354 self.device.releaseInterface(self.interface) 355 self.device.close() 356 self.context.close() 357 358 # Wait for the thread to terminate 359 await self.event_loop_done 360 361 # Find the device according to the spec moniker 362 load_libusb() 363 context = usb1.USBContext() 364 context.open() 365 try: 366 found = None 367 368 if spec.endswith('!'): 369 spec = spec[:-1] 370 forced_mode = True 371 else: 372 forced_mode = False 373 374 if ':' in spec: 375 vendor_id, product_id = spec.split(':') 376 serial_number = None 377 device_index = 0 378 if '/' in product_id: 379 product_id, serial_number = product_id.split('/') 380 elif '#' in product_id: 381 product_id, device_index_str = product_id.split('#') 382 device_index = int(device_index_str) 383 384 for device in context.getDeviceIterator(skip_on_error=True): 385 try: 386 device_serial_number = device.getSerialNumber() 387 except usb1.USBError: 388 device_serial_number = None 389 if ( 390 device.getVendorID() == int(vendor_id, 16) 391 and device.getProductID() == int(product_id, 16) 392 and (serial_number is None or serial_number == device_serial_number) 393 ): 394 if device_index == 0: 395 found = device 396 break 397 device_index -= 1 398 device.close() 399 elif '-' in spec: 400 401 def device_path(device): 402 return f'{device.getBusNumber()}-{".".join(map(str, device.getPortNumberList()))}' 403 404 for device in context.getDeviceIterator(skip_on_error=True): 405 if device_path(device) == spec: 406 found = device 407 break 408 device.close() 409 else: 410 # Look for a compatible device by index 411 def device_is_bluetooth_hci(device): 412 # Check if the device class indicates a match 413 if ( 414 device.getDeviceClass(), 415 device.getDeviceSubClass(), 416 device.getDeviceProtocol(), 417 ) == USB_BT_HCI_CLASS_TUPLE: 418 return True 419 420 # If the device class is 'Device', look for a matching interface 421 if device.getDeviceClass() == USB_DEVICE_CLASS_DEVICE: 422 for configuration in device: 423 for interface in configuration: 424 for setting in interface: 425 if ( 426 setting.getClass(), 427 setting.getSubClass(), 428 setting.getProtocol(), 429 ) == USB_BT_HCI_CLASS_TUPLE: 430 return True 431 432 return False 433 434 device_index = int(spec) 435 for device in context.getDeviceIterator(skip_on_error=True): 436 if device_is_bluetooth_hci(device): 437 if device_index == 0: 438 found = device 439 break 440 device_index -= 1 441 device.close() 442 443 if found is None: 444 context.close() 445 raise ValueError('device not found') 446 447 logger.debug(f'USB Device: {found}') 448 449 # Look for the first interface with the right class and endpoints 450 def find_endpoints(device): 451 # pylint: disable-next=too-many-nested-blocks 452 for configuration_index, configuration in enumerate(device): 453 interface = None 454 for interface in configuration: 455 setting = None 456 for setting in interface: 457 if ( 458 not forced_mode 459 and ( 460 setting.getClass(), 461 setting.getSubClass(), 462 setting.getProtocol(), 463 ) 464 != USB_BT_HCI_CLASS_TUPLE 465 ): 466 continue 467 468 events_in = None 469 acl_in = None 470 acl_out = None 471 for endpoint in setting: 472 attributes = endpoint.getAttributes() 473 address = endpoint.getAddress() 474 if attributes & 0x03 == USB_ENDPOINT_TRANSFER_TYPE_BULK: 475 if address & USB_ENDPOINT_IN and acl_in is None: 476 acl_in = address 477 elif acl_out is None: 478 acl_out = address 479 elif ( 480 attributes & 0x03 481 == USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT 482 ): 483 if address & USB_ENDPOINT_IN and events_in is None: 484 events_in = address 485 486 # Return if we found all 3 endpoints 487 if ( 488 acl_in is not None 489 and acl_out is not None 490 and events_in is not None 491 ): 492 return ( 493 configuration_index + 1, 494 setting.getNumber(), 495 setting.getAlternateSetting(), 496 acl_in, 497 acl_out, 498 events_in, 499 ) 500 501 logger.debug( 502 f'skipping configuration {configuration_index + 1} / ' 503 f'interface {setting.getNumber()}' 504 ) 505 506 return None 507 508 endpoints = find_endpoints(found) 509 if endpoints is None: 510 raise ValueError('no compatible interface found for device') 511 (configuration, interface, setting, acl_in, acl_out, events_in) = endpoints 512 logger.debug( 513 f'selected endpoints: configuration={configuration}, ' 514 f'interface={interface}, ' 515 f'setting={setting}, ' 516 f'acl_in=0x{acl_in:02X}, ' 517 f'acl_out=0x{acl_out:02X}, ' 518 f'events_in=0x{events_in:02X}, ' 519 ) 520 521 device_metadata = { 522 'vendor_id': found.getVendorID(), 523 'product_id': found.getProductID(), 524 } 525 device = found.open() 526 527 # Auto-detach the kernel driver if supported 528 # pylint: disable=no-member 529 if usb1.hasCapability(usb1.CAP_SUPPORTS_DETACH_KERNEL_DRIVER): 530 try: 531 logger.debug('auto-detaching kernel driver') 532 device.setAutoDetachKernelDriver(True) 533 except usb1.USBError as error: 534 logger.warning(f'unable to auto-detach kernel driver: {error}') 535 536 # Set the configuration if needed 537 try: 538 current_configuration = device.getConfiguration() 539 logger.debug(f'current configuration = {current_configuration}') 540 except usb1.USBError: 541 current_configuration = 0 542 543 if current_configuration != configuration: 544 try: 545 logger.debug(f'setting configuration {configuration}') 546 device.setConfiguration(configuration) 547 except usb1.USBError: 548 logger.warning('failed to set configuration') 549 550 source = UsbPacketSource(device, device_metadata, acl_in, events_in) 551 sink = UsbPacketSink(device, acl_out) 552 return UsbTransport(context, device, interface, setting, source, sink) 553 except usb1.USBError as error: 554 logger.warning(color(f'!!! failed to open USB device: {error}', 'red')) 555 context.close() 556 raise 557