• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import logging
20import usb.core
21import usb.util
22import threading
23import time
24from colors import color
25
26from .common import Transport, ParserSource
27from .. import hci
28
29
30# -----------------------------------------------------------------------------
31# Logging
32# -----------------------------------------------------------------------------
33logger = logging.getLogger(__name__)
34
35
36# -----------------------------------------------------------------------------
37async def open_pyusb_transport(spec):
38    '''
39    Open a USB transport. [Implementation based on PyUSB]
40    The parameter string has this syntax:
41    either <index> or <vendor>:<product>
42    With <index> as the 0-based index to select amongst all the devices that appear
43    to be supporting Bluetooth HCI (0 being the first one), or
44    Where <vendor> and <product> are the vendor ID and product ID in hexadecimal.
45
46    Examples:
47    0 --> the first BT USB dongle
48    04b4:f901 --> the BT USB dongle with vendor=04b4 and product=f901
49    '''
50
51    USB_RECIPIENT_DEVICE                             = 0x00
52    USB_REQUEST_TYPE_CLASS                           = 0x01 << 5
53    USB_ENDPOINT_EVENTS_IN                           = 0x81
54    USB_ENDPOINT_ACL_IN                              = 0x82
55    USB_ENDPOINT_SCO_IN                              = 0x83
56    USB_ENDPOINT_ACL_OUT                             = 0x02
57    #  USB_ENDPOINT_SCO_OUT                             = 0x03
58    USB_DEVICE_CLASS_WIRELESS_CONTROLLER             = 0xE0
59    USB_DEVICE_SUBCLASS_RF_CONTROLLER                = 0x01
60    USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01
61
62    READ_SIZE    = 1024
63    READ_TIMEOUT = 1000
64
65    class UsbPacketSink:
66        def __init__(self, device):
67            self.device     = device
68            self.thread     = threading.Thread(target=self.run)
69            self.loop       = asyncio.get_running_loop()
70            self.stop_event = None
71
72        def on_packet(self, packet):
73            # TODO: don't block here, just queue for the write thread
74            if len(packet) == 0:
75                logger.warning('packet too short')
76                return
77
78            packet_type = packet[0]
79            try:
80                if packet_type == hci.HCI_ACL_DATA_PACKET:
81                    self.device.write(USB_ENDPOINT_ACL_OUT, packet[1:])
82                elif packet_type == hci.HCI_COMMAND_PACKET:
83                    self.device.ctrl_transfer(USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS, 0, 0, 0, packet[1:])
84                else:
85                    logger.warning(color(f'unsupported packet type {packet_type}', 'red'))
86            except usb.core.USBTimeoutError:
87                logger.warning('USB Write Timeout')
88            except usb.core.USBError as error:
89                logger.warning(f'USB write error: {error}')
90                time.sleep(1)  # Sleep one second to avoid busy looping
91
92        def start(self):
93            self.thread.start()
94
95        async def stop(self):
96            # Create stop events and wait for them to be signaled
97            self.stop_event = asyncio.Event()
98            await self.stop_event.wait()
99
100        def run(self):
101            while self.stop_event is None:
102                time.sleep(1)
103            self.loop.call_soon_threadsafe(lambda: self.stop_event.set())
104
105    class UsbPacketSource(asyncio.Protocol, ParserSource):
106        def __init__(self, device, sco_enabled):
107            super().__init__()
108            self.device       = device
109            self.loop         = asyncio.get_running_loop()
110            self.queue        = asyncio.Queue()
111            self.event_thread = threading.Thread(
112                target=self.run,
113                args=(USB_ENDPOINT_EVENTS_IN, hci.HCI_EVENT_PACKET)
114            )
115            self.event_thread.stop_event = None
116            self.acl_thread = threading.Thread(
117                target=self.run,
118                args=(USB_ENDPOINT_ACL_IN, hci.HCI_ACL_DATA_PACKET)
119            )
120            self.acl_thread.stop_event = None
121
122            # SCO support is optional
123            self.sco_enabled = sco_enabled
124            if sco_enabled:
125                self.sco_thread = threading.Thread(
126                    target=self.run,
127                    args=(USB_ENDPOINT_SCO_IN, hci.HCI_SYNCHRONOUS_DATA_PACKET)
128                )
129                self.sco_thread.stop_event = None
130
131        def data_received(self, packet):
132            self.parser.feed_data(packet)
133
134        def enqueue(self, packet):
135            self.queue.put_nowait(packet)
136
137        async def dequeue(self):
138            while True:
139                try:
140                    data = await self.queue.get()
141                except asyncio.CancelledError:
142                    return
143                self.data_received(data)
144
145        def start(self):
146            self.dequeue_task = self.loop.create_task(self.dequeue())
147            self.event_thread.start()
148            self.acl_thread.start()
149            if self.sco_enabled:
150                self.sco_thread.start()
151
152        async def stop(self):
153            # Stop the dequeuing task
154            self.dequeue_task.cancel()
155
156            # Create stop events and wait for them to be signaled
157            self.event_thread.stop_event = asyncio.Event()
158            self.acl_thread.stop_event   = asyncio.Event()
159            await self.event_thread.stop_event.wait()
160            await self.acl_thread.stop_event.wait()
161            if self.sco_enabled:
162                await self.sco_thread.stop_event.wait()
163
164        def run(self, endpoint, packet_type):
165            # Read until asked to stop
166            current_thread = threading.current_thread()
167            while current_thread.stop_event is None:
168                try:
169                    # Read, with a timeout of 1 second
170                    data = self.device.read(endpoint, READ_SIZE, timeout=READ_TIMEOUT)
171                    packet = bytes([packet_type]) + data.tobytes()
172                    self.loop.call_soon_threadsafe(self.enqueue, packet)
173                except usb.core.USBTimeoutError:
174                    continue
175                except usb.core.USBError:
176                    # Don't log this: because pyusb doesn't really support multiple threads
177                    # reading at the same time, we can get occasional USBError(errno=5)
178                    # Input/Output errors reported, but they seem to be harmless.
179                    # Until support for async or multi-thread support is added to pyusb,
180                    # we'll just live with this as is...
181                    # logger.warning(f'USB read error: {error}')
182                    time.sleep(1)  # Sleep one second to avoid busy looping
183
184            stop_event = current_thread.stop_event
185            self.loop.call_soon_threadsafe(lambda: stop_event.set())
186
187    class UsbTransport(Transport):
188        def __init__(self, device, source, sink):
189            super().__init__(source, sink)
190            self.device = device
191
192        async def close(self):
193            await self.source.stop()
194            await self.sink.stop()
195            usb.util.release_interface(self.device, 0)
196
197    # Find the device according to the spec moniker
198    if ':' in spec:
199        vendor_id, product_id = spec.split(':')
200        device = usb.core.find(idVendor=int(vendor_id, 16), idProduct=int(product_id, 16))
201    else:
202        device_index = int(spec)
203        devices = list(usb.core.find(
204            find_all        = 1,
205            bDeviceClass    = USB_DEVICE_CLASS_WIRELESS_CONTROLLER,
206            bDeviceSubClass = USB_DEVICE_SUBCLASS_RF_CONTROLLER,
207            bDeviceProtocol = USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
208        ))
209        if len(devices) > device_index:
210            device = devices[device_index]
211        else:
212            device = None
213
214    if device is None:
215        raise ValueError('device not found')
216    logger.debug(f'USB Device: {device}')
217
218    # Detach the kernel driver if needed
219    if device.is_kernel_driver_active(0):
220        logger.debug("detaching kernel driver")
221        device.detach_kernel_driver(0)
222
223    # Set configuration, if needed
224    try:
225        configuration = device.get_active_configuration()
226    except usb.core.USBError:
227        device.set_configuration()
228        configuration = device.get_active_configuration()
229    interface = configuration[(0, 0)]
230    logger.debug(f'USB Interface: {interface}')
231    usb.util.claim_interface(device, 0)
232
233    # Select an alternate setting for SCO, if available
234    sco_enabled = False
235    # NOTE: this is disabled for now, because SCO with alternate settings is broken,
236    # see: https://github.com/libusb/libusb/issues/36
237    #
238    # best_packet_size = 0
239    # best_interface = None
240    # sco_enabled = False
241    # for interface in configuration:
242    #     iso_in_endpoint = None
243    #     iso_out_endpoint = None
244    #     for endpoint in interface:
245    #         if (endpoint.bEndpointAddress == USB_ENDPOINT_SCO_IN and
246    #             usb.util.endpoint_direction(endpoint.bEndpointAddress) == usb.util.ENDPOINT_IN and
247    #             usb.util.endpoint_type(endpoint.bmAttributes) == usb.util.ENDPOINT_TYPE_ISO):
248    #             iso_in_endpoint = endpoint
249    #             continue
250    #         if (endpoint.bEndpointAddress == USB_ENDPOINT_SCO_OUT and
251    #             usb.util.endpoint_direction(endpoint.bEndpointAddress) == usb.util.ENDPOINT_OUT and
252    #             usb.util.endpoint_type(endpoint.bmAttributes) == usb.util.ENDPOINT_TYPE_ISO):
253    #             iso_out_endpoint = endpoint
254
255    #     if iso_in_endpoint is not None and iso_out_endpoint is not None:
256    #         if iso_out_endpoint.wMaxPacketSize > best_packet_size:
257    #             best_packet_size = iso_out_endpoint.wMaxPacketSize
258    #             best_interface = interface
259
260    # if best_interface is not None:
261    #     logger.debug(f'SCO enabled, selecting alternate setting (wMaxPacketSize={best_packet_size}): {best_interface}')
262    #     sco_enabled = True
263    #     try:
264    #         device.set_interface_altsetting(
265    #             interface = best_interface.bInterfaceNumber,
266    #             alternate_setting = best_interface.bAlternateSetting
267    #         )
268    #     except usb.USBError:
269    #         logger.warning('failed to set alternate setting')
270
271    packet_source = UsbPacketSource(device, sco_enabled)
272    packet_sink = UsbPacketSink(device)
273    packet_source.start()
274    packet_sink.start()
275
276    return UsbTransport(device, packet_source, packet_sink)