1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import logging 20import threading 21import collections 22import ctypes 23import platform 24 25import usb1 26 27from .common import Transport, ParserSource 28from .. import hci 29from ..colors import color 30 31 32# ----------------------------------------------------------------------------- 33# Logging 34# ----------------------------------------------------------------------------- 35logger = logging.getLogger(__name__) 36 37 38# ----------------------------------------------------------------------------- 39def load_libusb(): 40 ''' 41 Attempt to load the libusb-1.0 C library from libusb_package in site-packages. 42 If the library exists, we create a DLL object and initialize the usb1 backend. 43 This only needs to be done once, but before a usb1.USBContext is created. 44 If the library does not exists, do nothing and usb1 will search default system paths 45 when usb1.USBContext is created. 46 ''' 47 try: 48 import libusb_package 49 except ImportError: 50 logger.debug('libusb_package is not available') 51 else: 52 if libusb_path := libusb_package.get_library_path(): 53 logger.debug(f'loading libusb library at {libusb_path}') 54 dll_loader = ( 55 ctypes.WinDLL if platform.system() == 'Windows' else ctypes.CDLL 56 ) 57 libusb_dll = dll_loader( 58 str(libusb_path), use_errno=True, use_last_error=True 59 ) 60 usb1.loadLibrary(libusb_dll) 61 62 63async def open_usb_transport(spec): 64 ''' 65 Open a USB transport. 66 The moniker string has this syntax: 67 either <index> or 68 <vendor>:<product> or 69 <vendor>:<product>/<serial-number>] or 70 <vendor>:<product>#<index> 71 With <index> as the 0-based index to select amongst all the devices that appear 72 to be supporting Bluetooth HCI (0 being the first one), or 73 Where <vendor> and <product> are the vendor ID and product ID in hexadecimal. The 74 /<serial-number> suffix or #<index> suffix max be specified when more than one 75 device with the same vendor and product identifiers are present. 76 77 In addition, if the moniker ends with the symbol "!", the device will be used in 78 "forced" mode: 79 the first USB interface of the device will be used, regardless of the interface 80 class/subclass. 81 This may be useful for some devices that use a custom class/subclass but may 82 nonetheless work as-is. 83 84 Examples: 85 0 --> the first BT USB dongle 86 04b4:f901 --> the BT USB dongle with vendor=04b4 and product=f901 87 04b4:f901#2 --> the third USB device with vendor=04b4 and product=f901 88 04b4:f901/00E04C239987 --> the BT USB dongle with vendor=04b4 and product=f901 and 89 serial number 00E04C239987 90 usb:0B05:17CB! --> the BT USB dongle vendor=0B05 and product=17CB, in "forced" mode. 91 ''' 92 93 # pylint: disable=invalid-name 94 USB_RECIPIENT_DEVICE = 0x00 95 USB_REQUEST_TYPE_CLASS = 0x01 << 5 96 USB_DEVICE_CLASS_DEVICE = 0x00 97 USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0 98 USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01 99 USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01 100 USB_ENDPOINT_TRANSFER_TYPE_BULK = 0x02 101 USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT = 0x03 102 USB_ENDPOINT_IN = 0x80 103 104 USB_BT_HCI_CLASS_TUPLE = ( 105 USB_DEVICE_CLASS_WIRELESS_CONTROLLER, 106 USB_DEVICE_SUBCLASS_RF_CONTROLLER, 107 USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER, 108 ) 109 110 READ_SIZE = 1024 111 112 class UsbPacketSink: 113 def __init__(self, device, acl_out): 114 self.device = device 115 self.acl_out = acl_out 116 self.transfer = device.getTransfer() 117 self.packets = collections.deque() # Queue of packets waiting to be sent 118 self.loop = asyncio.get_running_loop() 119 self.cancel_done = self.loop.create_future() 120 self.closed = False 121 122 def start(self): 123 pass 124 125 def on_packet(self, packet): 126 # Ignore packets if we're closed 127 if self.closed: 128 return 129 130 if len(packet) == 0: 131 logger.warning('packet too short') 132 return 133 134 # Queue the packet 135 self.packets.append(packet) 136 if len(self.packets) == 1: 137 # The queue was previously empty, re-prime the pump 138 self.process_queue() 139 140 def on_packet_sent(self, transfer): 141 status = transfer.getStatus() 142 # logger.debug(f'<<< USB out transfer callback: status={status}') 143 144 # pylint: disable=no-member 145 if status == usb1.TRANSFER_COMPLETED: 146 self.loop.call_soon_threadsafe(self.on_packet_sent_) 147 elif status == usb1.TRANSFER_CANCELLED: 148 self.loop.call_soon_threadsafe(self.cancel_done.set_result, None) 149 else: 150 logger.warning( 151 color(f'!!! out transfer not completed: status={status}', 'red') 152 ) 153 154 def on_packet_sent_(self): 155 if self.packets: 156 self.packets.popleft() 157 self.process_queue() 158 159 def process_queue(self): 160 if len(self.packets) == 0: 161 return # Nothing to do 162 163 packet = self.packets[0] 164 packet_type = packet[0] 165 if packet_type == hci.HCI_ACL_DATA_PACKET: 166 self.transfer.setBulk( 167 self.acl_out, packet[1:], callback=self.on_packet_sent 168 ) 169 logger.debug('submit ACL') 170 self.transfer.submit() 171 elif packet_type == hci.HCI_COMMAND_PACKET: 172 self.transfer.setControl( 173 USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS, 174 0, 175 0, 176 0, 177 packet[1:], 178 callback=self.on_packet_sent, 179 ) 180 logger.debug('submit COMMAND') 181 self.transfer.submit() 182 else: 183 logger.warning(color(f'unsupported packet type {packet_type}', 'red')) 184 185 def close(self): 186 self.closed = True 187 188 async def terminate(self): 189 if not self.closed: 190 self.close() 191 192 # Empty the packet queue so that we don't send any more data 193 self.packets.clear() 194 195 # If we have a transfer in flight, cancel it 196 if self.transfer.isSubmitted(): 197 # Try to cancel the transfer, but that may fail because it may have 198 # already completed 199 try: 200 self.transfer.cancel() 201 202 logger.debug('waiting for OUT transfer cancellation to be done...') 203 await self.cancel_done 204 logger.debug('OUT transfer cancellation done') 205 except usb1.USBError: 206 logger.debug('OUT transfer likely already completed') 207 208 class UsbPacketSource(asyncio.Protocol, ParserSource): 209 def __init__(self, context, device, acl_in, events_in): 210 super().__init__() 211 self.context = context 212 self.device = device 213 self.acl_in = acl_in 214 self.events_in = events_in 215 self.loop = asyncio.get_running_loop() 216 self.queue = asyncio.Queue() 217 self.dequeue_task = None 218 self.closed = False 219 self.event_loop_done = self.loop.create_future() 220 self.cancel_done = { 221 hci.HCI_EVENT_PACKET: self.loop.create_future(), 222 hci.HCI_ACL_DATA_PACKET: self.loop.create_future(), 223 } 224 self.events_in_transfer = None 225 self.acl_in_transfer = None 226 227 # Create a thread to process events 228 self.event_thread = threading.Thread(target=self.run) 229 230 def start(self): 231 # Set up transfer objects for input 232 self.events_in_transfer = device.getTransfer() 233 self.events_in_transfer.setInterrupt( 234 self.events_in, 235 READ_SIZE, 236 callback=self.on_packet_received, 237 user_data=hci.HCI_EVENT_PACKET, 238 ) 239 self.events_in_transfer.submit() 240 241 self.acl_in_transfer = device.getTransfer() 242 self.acl_in_transfer.setBulk( 243 self.acl_in, 244 READ_SIZE, 245 callback=self.on_packet_received, 246 user_data=hci.HCI_ACL_DATA_PACKET, 247 ) 248 self.acl_in_transfer.submit() 249 250 self.dequeue_task = self.loop.create_task(self.dequeue()) 251 self.event_thread.start() 252 253 def on_packet_received(self, transfer): 254 packet_type = transfer.getUserData() 255 status = transfer.getStatus() 256 # logger.debug( 257 # f'<<< USB IN transfer callback: status={status} ' 258 # f'packet_type={packet_type} ' 259 # f'length={transfer.getActualLength()}' 260 # ) 261 262 # pylint: disable=no-member 263 if status == usb1.TRANSFER_COMPLETED: 264 packet = ( 265 bytes([packet_type]) 266 + transfer.getBuffer()[: transfer.getActualLength()] 267 ) 268 self.loop.call_soon_threadsafe(self.queue.put_nowait, packet) 269 elif status == usb1.TRANSFER_CANCELLED: 270 self.loop.call_soon_threadsafe( 271 self.cancel_done[packet_type].set_result, None 272 ) 273 return 274 else: 275 logger.warning( 276 color(f'!!! transfer not completed: status={status}', 'red') 277 ) 278 279 # Re-submit the transfer so we can receive more data 280 transfer.submit() 281 282 async def dequeue(self): 283 while not self.closed: 284 try: 285 packet = await self.queue.get() 286 except asyncio.CancelledError: 287 return 288 self.parser.feed_data(packet) 289 290 def run(self): 291 logger.debug('starting USB event loop') 292 while ( 293 self.events_in_transfer.isSubmitted() 294 or self.acl_in_transfer.isSubmitted() 295 ): 296 # pylint: disable=no-member 297 try: 298 self.context.handleEvents() 299 except usb1.USBErrorInterrupted: 300 pass 301 302 logger.debug('USB event loop done') 303 self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None) 304 305 def close(self): 306 self.closed = True 307 308 async def terminate(self): 309 if not self.closed: 310 self.close() 311 312 self.dequeue_task.cancel() 313 314 # Cancel the transfers 315 for transfer in (self.events_in_transfer, self.acl_in_transfer): 316 if transfer.isSubmitted(): 317 # Try to cancel the transfer, but that may fail because it may have 318 # already completed 319 packet_type = transfer.getUserData() 320 try: 321 transfer.cancel() 322 logger.debug( 323 f'waiting for IN[{packet_type}] transfer cancellation ' 324 'to be done...' 325 ) 326 await self.cancel_done[packet_type] 327 logger.debug(f'IN[{packet_type}] transfer cancellation done') 328 except usb1.USBError: 329 logger.debug( 330 f'IN[{packet_type}] transfer likely already completed' 331 ) 332 333 # Wait for the thread to terminate 334 await self.event_loop_done 335 336 class UsbTransport(Transport): 337 def __init__(self, context, device, interface, setting, source, sink): 338 super().__init__(source, sink) 339 self.context = context 340 self.device = device 341 self.interface = interface 342 343 # Get exclusive access 344 device.claimInterface(interface) 345 346 # Set the alternate setting if not the default 347 if setting != 0: 348 device.setInterfaceAltSetting(interface, setting) 349 350 # The source and sink can now start 351 source.start() 352 sink.start() 353 354 async def close(self): 355 self.source.close() 356 self.sink.close() 357 await self.source.terminate() 358 await self.sink.terminate() 359 self.device.releaseInterface(self.interface) 360 self.device.close() 361 self.context.close() 362 363 # Find the device according to the spec moniker 364 load_libusb() 365 context = usb1.USBContext() 366 context.open() 367 try: 368 found = None 369 370 if spec.endswith('!'): 371 spec = spec[:-1] 372 forced_mode = True 373 else: 374 forced_mode = False 375 376 if ':' in spec: 377 vendor_id, product_id = spec.split(':') 378 serial_number = None 379 device_index = 0 380 if '/' in product_id: 381 product_id, serial_number = product_id.split('/') 382 elif '#' in product_id: 383 product_id, device_index_str = product_id.split('#') 384 device_index = int(device_index_str) 385 386 for device in context.getDeviceIterator(skip_on_error=True): 387 try: 388 device_serial_number = device.getSerialNumber() 389 except usb1.USBError: 390 device_serial_number = None 391 if ( 392 device.getVendorID() == int(vendor_id, 16) 393 and device.getProductID() == int(product_id, 16) 394 and (serial_number is None or serial_number == device_serial_number) 395 ): 396 if device_index == 0: 397 found = device 398 break 399 device_index -= 1 400 device.close() 401 else: 402 # Look for a compatible device by index 403 def device_is_bluetooth_hci(device): 404 # Check if the device class indicates a match 405 if ( 406 device.getDeviceClass(), 407 device.getDeviceSubClass(), 408 device.getDeviceProtocol(), 409 ) == USB_BT_HCI_CLASS_TUPLE: 410 return True 411 412 # If the device class is 'Device', look for a matching interface 413 if device.getDeviceClass() == USB_DEVICE_CLASS_DEVICE: 414 for configuration in device: 415 for interface in configuration: 416 for setting in interface: 417 if ( 418 setting.getClass(), 419 setting.getSubClass(), 420 setting.getProtocol(), 421 ) == USB_BT_HCI_CLASS_TUPLE: 422 return True 423 424 return False 425 426 device_index = int(spec) 427 for device in context.getDeviceIterator(skip_on_error=True): 428 if device_is_bluetooth_hci(device): 429 if device_index == 0: 430 found = device 431 break 432 device_index -= 1 433 device.close() 434 435 if found is None: 436 context.close() 437 raise ValueError('device not found') 438 439 logger.debug(f'USB Device: {found}') 440 441 # Look for the first interface with the right class and endpoints 442 def find_endpoints(device): 443 # pylint: disable-next=too-many-nested-blocks 444 for (configuration_index, configuration) in enumerate(device): 445 interface = None 446 for interface in configuration: 447 setting = None 448 for setting in interface: 449 if ( 450 not forced_mode 451 and ( 452 setting.getClass(), 453 setting.getSubClass(), 454 setting.getProtocol(), 455 ) 456 != USB_BT_HCI_CLASS_TUPLE 457 ): 458 continue 459 460 events_in = None 461 acl_in = None 462 acl_out = None 463 for endpoint in setting: 464 attributes = endpoint.getAttributes() 465 address = endpoint.getAddress() 466 if attributes & 0x03 == USB_ENDPOINT_TRANSFER_TYPE_BULK: 467 if address & USB_ENDPOINT_IN and acl_in is None: 468 acl_in = address 469 elif acl_out is None: 470 acl_out = address 471 elif ( 472 attributes & 0x03 473 == USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT 474 ): 475 if address & USB_ENDPOINT_IN and events_in is None: 476 events_in = address 477 478 # Return if we found all 3 endpoints 479 if ( 480 acl_in is not None 481 and acl_out is not None 482 and events_in is not None 483 ): 484 return ( 485 configuration_index + 1, 486 setting.getNumber(), 487 setting.getAlternateSetting(), 488 acl_in, 489 acl_out, 490 events_in, 491 ) 492 493 logger.debug( 494 f'skipping configuration {configuration_index + 1} / ' 495 f'interface {setting.getNumber()}' 496 ) 497 498 return None 499 500 endpoints = find_endpoints(found) 501 if endpoints is None: 502 raise ValueError('no compatible interface found for device') 503 (configuration, interface, setting, acl_in, acl_out, events_in) = endpoints 504 logger.debug( 505 f'selected endpoints: configuration={configuration}, ' 506 f'interface={interface}, ' 507 f'setting={setting}, ' 508 f'acl_in=0x{acl_in:02X}, ' 509 f'acl_out=0x{acl_out:02X}, ' 510 f'events_in=0x{events_in:02X}, ' 511 ) 512 513 device = found.open() 514 515 # Auto-detach the kernel driver if supported 516 # pylint: disable=no-member 517 if usb1.hasCapability(usb1.CAP_SUPPORTS_DETACH_KERNEL_DRIVER): 518 try: 519 logger.debug('auto-detaching kernel driver') 520 device.setAutoDetachKernelDriver(True) 521 except usb1.USBError as error: 522 logger.warning(f'unable to auto-detach kernel driver: {error}') 523 524 # Set the configuration if needed 525 try: 526 current_configuration = device.getConfiguration() 527 logger.debug(f'current configuration = {current_configuration}') 528 except usb1.USBError: 529 current_configuration = 0 530 531 if current_configuration != configuration: 532 try: 533 logger.debug(f'setting configuration {configuration}') 534 device.setConfiguration(configuration) 535 except usb1.USBError: 536 logger.warning('failed to set configuration') 537 538 source = UsbPacketSource(context, device, acl_in, events_in) 539 sink = UsbPacketSink(device, acl_out) 540 return UsbTransport(context, device, interface, setting, source, sink) 541 except usb1.USBError as error: 542 logger.warning(color(f'!!! failed to open USB device: {error}', 'red')) 543 context.close() 544 raise 545