1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import logging 20import usb.core 21import usb.util 22import threading 23import time 24from colors import color 25 26from .common import Transport, ParserSource 27from .. import hci 28 29 30# ----------------------------------------------------------------------------- 31# Logging 32# ----------------------------------------------------------------------------- 33logger = logging.getLogger(__name__) 34 35 36# ----------------------------------------------------------------------------- 37async def open_pyusb_transport(spec): 38 ''' 39 Open a USB transport. [Implementation based on PyUSB] 40 The parameter string has this syntax: 41 either <index> or <vendor>:<product> 42 With <index> as the 0-based index to select amongst all the devices that appear 43 to be supporting Bluetooth HCI (0 being the first one), or 44 Where <vendor> and <product> are the vendor ID and product ID in hexadecimal. 45 46 Examples: 47 0 --> the first BT USB dongle 48 04b4:f901 --> the BT USB dongle with vendor=04b4 and product=f901 49 ''' 50 51 USB_RECIPIENT_DEVICE = 0x00 52 USB_REQUEST_TYPE_CLASS = 0x01 << 5 53 USB_ENDPOINT_EVENTS_IN = 0x81 54 USB_ENDPOINT_ACL_IN = 0x82 55 USB_ENDPOINT_SCO_IN = 0x83 56 USB_ENDPOINT_ACL_OUT = 0x02 57 # USB_ENDPOINT_SCO_OUT = 0x03 58 USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0 59 USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01 60 USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01 61 62 READ_SIZE = 1024 63 READ_TIMEOUT = 1000 64 65 class UsbPacketSink: 66 def __init__(self, device): 67 self.device = device 68 self.thread = threading.Thread(target=self.run) 69 self.loop = asyncio.get_running_loop() 70 self.stop_event = None 71 72 def on_packet(self, packet): 73 # TODO: don't block here, just queue for the write thread 74 if len(packet) == 0: 75 logger.warning('packet too short') 76 return 77 78 packet_type = packet[0] 79 try: 80 if packet_type == hci.HCI_ACL_DATA_PACKET: 81 self.device.write(USB_ENDPOINT_ACL_OUT, packet[1:]) 82 elif packet_type == hci.HCI_COMMAND_PACKET: 83 self.device.ctrl_transfer(USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS, 0, 0, 0, packet[1:]) 84 else: 85 logger.warning(color(f'unsupported packet type {packet_type}', 'red')) 86 except usb.core.USBTimeoutError: 87 logger.warning('USB Write Timeout') 88 except usb.core.USBError as error: 89 logger.warning(f'USB write error: {error}') 90 time.sleep(1) # Sleep one second to avoid busy looping 91 92 def start(self): 93 self.thread.start() 94 95 async def stop(self): 96 # Create stop events and wait for them to be signaled 97 self.stop_event = asyncio.Event() 98 await self.stop_event.wait() 99 100 def run(self): 101 while self.stop_event is None: 102 time.sleep(1) 103 self.loop.call_soon_threadsafe(lambda: self.stop_event.set()) 104 105 class UsbPacketSource(asyncio.Protocol, ParserSource): 106 def __init__(self, device, sco_enabled): 107 super().__init__() 108 self.device = device 109 self.loop = asyncio.get_running_loop() 110 self.queue = asyncio.Queue() 111 self.event_thread = threading.Thread( 112 target=self.run, 113 args=(USB_ENDPOINT_EVENTS_IN, hci.HCI_EVENT_PACKET) 114 ) 115 self.event_thread.stop_event = None 116 self.acl_thread = threading.Thread( 117 target=self.run, 118 args=(USB_ENDPOINT_ACL_IN, hci.HCI_ACL_DATA_PACKET) 119 ) 120 self.acl_thread.stop_event = None 121 122 # SCO support is optional 123 self.sco_enabled = sco_enabled 124 if sco_enabled: 125 self.sco_thread = threading.Thread( 126 target=self.run, 127 args=(USB_ENDPOINT_SCO_IN, hci.HCI_SYNCHRONOUS_DATA_PACKET) 128 ) 129 self.sco_thread.stop_event = None 130 131 def data_received(self, packet): 132 self.parser.feed_data(packet) 133 134 def enqueue(self, packet): 135 self.queue.put_nowait(packet) 136 137 async def dequeue(self): 138 while True: 139 try: 140 data = await self.queue.get() 141 except asyncio.CancelledError: 142 return 143 self.data_received(data) 144 145 def start(self): 146 self.dequeue_task = self.loop.create_task(self.dequeue()) 147 self.event_thread.start() 148 self.acl_thread.start() 149 if self.sco_enabled: 150 self.sco_thread.start() 151 152 async def stop(self): 153 # Stop the dequeuing task 154 self.dequeue_task.cancel() 155 156 # Create stop events and wait for them to be signaled 157 self.event_thread.stop_event = asyncio.Event() 158 self.acl_thread.stop_event = asyncio.Event() 159 await self.event_thread.stop_event.wait() 160 await self.acl_thread.stop_event.wait() 161 if self.sco_enabled: 162 await self.sco_thread.stop_event.wait() 163 164 def run(self, endpoint, packet_type): 165 # Read until asked to stop 166 current_thread = threading.current_thread() 167 while current_thread.stop_event is None: 168 try: 169 # Read, with a timeout of 1 second 170 data = self.device.read(endpoint, READ_SIZE, timeout=READ_TIMEOUT) 171 packet = bytes([packet_type]) + data.tobytes() 172 self.loop.call_soon_threadsafe(self.enqueue, packet) 173 except usb.core.USBTimeoutError: 174 continue 175 except usb.core.USBError: 176 # Don't log this: because pyusb doesn't really support multiple threads 177 # reading at the same time, we can get occasional USBError(errno=5) 178 # Input/Output errors reported, but they seem to be harmless. 179 # Until support for async or multi-thread support is added to pyusb, 180 # we'll just live with this as is... 181 # logger.warning(f'USB read error: {error}') 182 time.sleep(1) # Sleep one second to avoid busy looping 183 184 stop_event = current_thread.stop_event 185 self.loop.call_soon_threadsafe(lambda: stop_event.set()) 186 187 class UsbTransport(Transport): 188 def __init__(self, device, source, sink): 189 super().__init__(source, sink) 190 self.device = device 191 192 async def close(self): 193 await self.source.stop() 194 await self.sink.stop() 195 usb.util.release_interface(self.device, 0) 196 197 # Find the device according to the spec moniker 198 if ':' in spec: 199 vendor_id, product_id = spec.split(':') 200 device = usb.core.find(idVendor=int(vendor_id, 16), idProduct=int(product_id, 16)) 201 else: 202 device_index = int(spec) 203 devices = list(usb.core.find( 204 find_all = 1, 205 bDeviceClass = USB_DEVICE_CLASS_WIRELESS_CONTROLLER, 206 bDeviceSubClass = USB_DEVICE_SUBCLASS_RF_CONTROLLER, 207 bDeviceProtocol = USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER 208 )) 209 if len(devices) > device_index: 210 device = devices[device_index] 211 else: 212 device = None 213 214 if device is None: 215 raise ValueError('device not found') 216 logger.debug(f'USB Device: {device}') 217 218 # Detach the kernel driver if needed 219 if device.is_kernel_driver_active(0): 220 logger.debug("detaching kernel driver") 221 device.detach_kernel_driver(0) 222 223 # Set configuration, if needed 224 try: 225 configuration = device.get_active_configuration() 226 except usb.core.USBError: 227 device.set_configuration() 228 configuration = device.get_active_configuration() 229 interface = configuration[(0, 0)] 230 logger.debug(f'USB Interface: {interface}') 231 usb.util.claim_interface(device, 0) 232 233 # Select an alternate setting for SCO, if available 234 sco_enabled = False 235 # NOTE: this is disabled for now, because SCO with alternate settings is broken, 236 # see: https://github.com/libusb/libusb/issues/36 237 # 238 # best_packet_size = 0 239 # best_interface = None 240 # sco_enabled = False 241 # for interface in configuration: 242 # iso_in_endpoint = None 243 # iso_out_endpoint = None 244 # for endpoint in interface: 245 # if (endpoint.bEndpointAddress == USB_ENDPOINT_SCO_IN and 246 # usb.util.endpoint_direction(endpoint.bEndpointAddress) == usb.util.ENDPOINT_IN and 247 # usb.util.endpoint_type(endpoint.bmAttributes) == usb.util.ENDPOINT_TYPE_ISO): 248 # iso_in_endpoint = endpoint 249 # continue 250 # if (endpoint.bEndpointAddress == USB_ENDPOINT_SCO_OUT and 251 # usb.util.endpoint_direction(endpoint.bEndpointAddress) == usb.util.ENDPOINT_OUT and 252 # usb.util.endpoint_type(endpoint.bmAttributes) == usb.util.ENDPOINT_TYPE_ISO): 253 # iso_out_endpoint = endpoint 254 255 # if iso_in_endpoint is not None and iso_out_endpoint is not None: 256 # if iso_out_endpoint.wMaxPacketSize > best_packet_size: 257 # best_packet_size = iso_out_endpoint.wMaxPacketSize 258 # best_interface = interface 259 260 # if best_interface is not None: 261 # logger.debug(f'SCO enabled, selecting alternate setting (wMaxPacketSize={best_packet_size}): {best_interface}') 262 # sco_enabled = True 263 # try: 264 # device.set_interface_altsetting( 265 # interface = best_interface.bInterfaceNumber, 266 # alternate_setting = best_interface.bAlternateSetting 267 # ) 268 # except usb.USBError: 269 # logger.warning('failed to set alternate setting') 270 271 packet_source = UsbPacketSource(device, sco_enabled) 272 packet_sink = UsbPacketSink(device) 273 packet_source.start() 274 packet_sink.start() 275 276 return UsbTransport(device, packet_source, packet_sink)