• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import logging
20import threading
21import collections
22import ctypes
23import platform
24
25import usb1
26
27from .common import Transport, ParserSource
28from .. import hci
29from ..colors import color
30
31
32# -----------------------------------------------------------------------------
33# Logging
34# -----------------------------------------------------------------------------
35logger = logging.getLogger(__name__)
36
37
38# -----------------------------------------------------------------------------
39def load_libusb():
40    '''
41    Attempt to load the libusb-1.0 C library from libusb_package in site-packages.
42    If the library exists, we create a DLL object and initialize the usb1 backend.
43    This only needs to be done once, but before a usb1.USBContext is created.
44    If the library does not exists, do nothing and usb1 will search default system paths
45    when usb1.USBContext is created.
46    '''
47    try:
48        import libusb_package
49    except ImportError:
50        logger.debug('libusb_package is not available')
51    else:
52        if libusb_path := libusb_package.get_library_path():
53            logger.debug(f'loading libusb library at {libusb_path}')
54            dll_loader = (
55                ctypes.WinDLL if platform.system() == 'Windows' else ctypes.CDLL
56            )
57            libusb_dll = dll_loader(
58                str(libusb_path), use_errno=True, use_last_error=True
59            )
60            usb1.loadLibrary(libusb_dll)
61
62
63async def open_usb_transport(spec):
64    '''
65    Open a USB transport.
66    The moniker string has this syntax:
67    either <index> or
68    <vendor>:<product> or
69    <vendor>:<product>/<serial-number>] or
70    <vendor>:<product>#<index>
71    With <index> as the 0-based index to select amongst all the devices that appear
72    to be supporting Bluetooth HCI (0 being the first one), or
73    Where <vendor> and <product> are the vendor ID and product ID in hexadecimal. The
74    /<serial-number> suffix or #<index> suffix max be specified when more than one
75    device with the same vendor and product identifiers are present.
76
77    In addition, if the moniker ends with the symbol "!", the device will be used in
78    "forced" mode:
79    the first USB interface of the device will be used, regardless of the interface
80    class/subclass.
81    This may be useful for some devices that use a custom class/subclass but may
82    nonetheless work as-is.
83
84    Examples:
85    0 --> the first BT USB dongle
86    04b4:f901 --> the BT USB dongle with vendor=04b4 and product=f901
87    04b4:f901#2 --> the third USB device with vendor=04b4 and product=f901
88    04b4:f901/00E04C239987 --> the BT USB dongle with vendor=04b4 and product=f901 and
89    serial number 00E04C239987
90    usb:0B05:17CB! --> the BT USB dongle vendor=0B05 and product=17CB, in "forced" mode.
91    '''
92
93    # pylint: disable=invalid-name
94    USB_RECIPIENT_DEVICE = 0x00
95    USB_REQUEST_TYPE_CLASS = 0x01 << 5
96    USB_DEVICE_CLASS_DEVICE = 0x00
97    USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0
98    USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01
99    USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01
100    USB_ENDPOINT_TRANSFER_TYPE_BULK = 0x02
101    USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT = 0x03
102    USB_ENDPOINT_IN = 0x80
103
104    USB_BT_HCI_CLASS_TUPLE = (
105        USB_DEVICE_CLASS_WIRELESS_CONTROLLER,
106        USB_DEVICE_SUBCLASS_RF_CONTROLLER,
107        USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER,
108    )
109
110    READ_SIZE = 1024
111
112    class UsbPacketSink:
113        def __init__(self, device, acl_out):
114            self.device = device
115            self.acl_out = acl_out
116            self.transfer = device.getTransfer()
117            self.packets = collections.deque()  # Queue of packets waiting to be sent
118            self.loop = asyncio.get_running_loop()
119            self.cancel_done = self.loop.create_future()
120            self.closed = False
121
122        def start(self):
123            pass
124
125        def on_packet(self, packet):
126            # Ignore packets if we're closed
127            if self.closed:
128                return
129
130            if len(packet) == 0:
131                logger.warning('packet too short')
132                return
133
134            # Queue the packet
135            self.packets.append(packet)
136            if len(self.packets) == 1:
137                # The queue was previously empty, re-prime the pump
138                self.process_queue()
139
140        def on_packet_sent(self, transfer):
141            status = transfer.getStatus()
142            # logger.debug(f'<<< USB out transfer callback: status={status}')
143
144            # pylint: disable=no-member
145            if status == usb1.TRANSFER_COMPLETED:
146                self.loop.call_soon_threadsafe(self.on_packet_sent_)
147            elif status == usb1.TRANSFER_CANCELLED:
148                self.loop.call_soon_threadsafe(self.cancel_done.set_result, None)
149            else:
150                logger.warning(
151                    color(f'!!! out transfer not completed: status={status}', 'red')
152                )
153
154        def on_packet_sent_(self):
155            if self.packets:
156                self.packets.popleft()
157                self.process_queue()
158
159        def process_queue(self):
160            if len(self.packets) == 0:
161                return  # Nothing to do
162
163            packet = self.packets[0]
164            packet_type = packet[0]
165            if packet_type == hci.HCI_ACL_DATA_PACKET:
166                self.transfer.setBulk(
167                    self.acl_out, packet[1:], callback=self.on_packet_sent
168                )
169                logger.debug('submit ACL')
170                self.transfer.submit()
171            elif packet_type == hci.HCI_COMMAND_PACKET:
172                self.transfer.setControl(
173                    USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS,
174                    0,
175                    0,
176                    0,
177                    packet[1:],
178                    callback=self.on_packet_sent,
179                )
180                logger.debug('submit COMMAND')
181                self.transfer.submit()
182            else:
183                logger.warning(color(f'unsupported packet type {packet_type}', 'red'))
184
185        def close(self):
186            self.closed = True
187
188        async def terminate(self):
189            if not self.closed:
190                self.close()
191
192            # Empty the packet queue so that we don't send any more data
193            self.packets.clear()
194
195            # If we have a transfer in flight, cancel it
196            if self.transfer.isSubmitted():
197                # Try to cancel the transfer, but that may fail because it may have
198                # already completed
199                try:
200                    self.transfer.cancel()
201
202                    logger.debug('waiting for OUT transfer cancellation to be done...')
203                    await self.cancel_done
204                    logger.debug('OUT transfer cancellation done')
205                except usb1.USBError:
206                    logger.debug('OUT transfer likely already completed')
207
208    class UsbPacketSource(asyncio.Protocol, ParserSource):
209        def __init__(self, context, device, acl_in, events_in):
210            super().__init__()
211            self.context = context
212            self.device = device
213            self.acl_in = acl_in
214            self.events_in = events_in
215            self.loop = asyncio.get_running_loop()
216            self.queue = asyncio.Queue()
217            self.dequeue_task = None
218            self.closed = False
219            self.event_loop_done = self.loop.create_future()
220            self.cancel_done = {
221                hci.HCI_EVENT_PACKET: self.loop.create_future(),
222                hci.HCI_ACL_DATA_PACKET: self.loop.create_future(),
223            }
224            self.events_in_transfer = None
225            self.acl_in_transfer = None
226
227            # Create a thread to process events
228            self.event_thread = threading.Thread(target=self.run)
229
230        def start(self):
231            # Set up transfer objects for input
232            self.events_in_transfer = device.getTransfer()
233            self.events_in_transfer.setInterrupt(
234                self.events_in,
235                READ_SIZE,
236                callback=self.on_packet_received,
237                user_data=hci.HCI_EVENT_PACKET,
238            )
239            self.events_in_transfer.submit()
240
241            self.acl_in_transfer = device.getTransfer()
242            self.acl_in_transfer.setBulk(
243                self.acl_in,
244                READ_SIZE,
245                callback=self.on_packet_received,
246                user_data=hci.HCI_ACL_DATA_PACKET,
247            )
248            self.acl_in_transfer.submit()
249
250            self.dequeue_task = self.loop.create_task(self.dequeue())
251            self.event_thread.start()
252
253        def on_packet_received(self, transfer):
254            packet_type = transfer.getUserData()
255            status = transfer.getStatus()
256            # logger.debug(
257            #     f'<<< USB IN transfer callback: status={status} '
258            #     f'packet_type={packet_type} '
259            #     f'length={transfer.getActualLength()}'
260            # )
261
262            # pylint: disable=no-member
263            if status == usb1.TRANSFER_COMPLETED:
264                packet = (
265                    bytes([packet_type])
266                    + transfer.getBuffer()[: transfer.getActualLength()]
267                )
268                self.loop.call_soon_threadsafe(self.queue.put_nowait, packet)
269            elif status == usb1.TRANSFER_CANCELLED:
270                self.loop.call_soon_threadsafe(
271                    self.cancel_done[packet_type].set_result, None
272                )
273                return
274            else:
275                logger.warning(
276                    color(f'!!! transfer not completed: status={status}', 'red')
277                )
278
279            # Re-submit the transfer so we can receive more data
280            transfer.submit()
281
282        async def dequeue(self):
283            while not self.closed:
284                try:
285                    packet = await self.queue.get()
286                except asyncio.CancelledError:
287                    return
288                self.parser.feed_data(packet)
289
290        def run(self):
291            logger.debug('starting USB event loop')
292            while (
293                self.events_in_transfer.isSubmitted()
294                or self.acl_in_transfer.isSubmitted()
295            ):
296                # pylint: disable=no-member
297                try:
298                    self.context.handleEvents()
299                except usb1.USBErrorInterrupted:
300                    pass
301
302            logger.debug('USB event loop done')
303            self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
304
305        def close(self):
306            self.closed = True
307
308        async def terminate(self):
309            if not self.closed:
310                self.close()
311
312            self.dequeue_task.cancel()
313
314            # Cancel the transfers
315            for transfer in (self.events_in_transfer, self.acl_in_transfer):
316                if transfer.isSubmitted():
317                    # Try to cancel the transfer, but that may fail because it may have
318                    # already completed
319                    packet_type = transfer.getUserData()
320                    try:
321                        transfer.cancel()
322                        logger.debug(
323                            f'waiting for IN[{packet_type}] transfer cancellation '
324                            'to be done...'
325                        )
326                        await self.cancel_done[packet_type]
327                        logger.debug(f'IN[{packet_type}] transfer cancellation done')
328                    except usb1.USBError:
329                        logger.debug(
330                            f'IN[{packet_type}] transfer likely already completed'
331                        )
332
333            # Wait for the thread to terminate
334            await self.event_loop_done
335
336    class UsbTransport(Transport):
337        def __init__(self, context, device, interface, setting, source, sink):
338            super().__init__(source, sink)
339            self.context = context
340            self.device = device
341            self.interface = interface
342
343            # Get exclusive access
344            device.claimInterface(interface)
345
346            # Set the alternate setting if not the default
347            if setting != 0:
348                device.setInterfaceAltSetting(interface, setting)
349
350            # The source and sink can now start
351            source.start()
352            sink.start()
353
354        async def close(self):
355            self.source.close()
356            self.sink.close()
357            await self.source.terminate()
358            await self.sink.terminate()
359            self.device.releaseInterface(self.interface)
360            self.device.close()
361            self.context.close()
362
363    # Find the device according to the spec moniker
364    load_libusb()
365    context = usb1.USBContext()
366    context.open()
367    try:
368        found = None
369
370        if spec.endswith('!'):
371            spec = spec[:-1]
372            forced_mode = True
373        else:
374            forced_mode = False
375
376        if ':' in spec:
377            vendor_id, product_id = spec.split(':')
378            serial_number = None
379            device_index = 0
380            if '/' in product_id:
381                product_id, serial_number = product_id.split('/')
382            elif '#' in product_id:
383                product_id, device_index_str = product_id.split('#')
384                device_index = int(device_index_str)
385
386            for device in context.getDeviceIterator(skip_on_error=True):
387                try:
388                    device_serial_number = device.getSerialNumber()
389                except usb1.USBError:
390                    device_serial_number = None
391                if (
392                    device.getVendorID() == int(vendor_id, 16)
393                    and device.getProductID() == int(product_id, 16)
394                    and (serial_number is None or serial_number == device_serial_number)
395                ):
396                    if device_index == 0:
397                        found = device
398                        break
399                    device_index -= 1
400                device.close()
401        else:
402            # Look for a compatible device by index
403            def device_is_bluetooth_hci(device):
404                # Check if the device class indicates a match
405                if (
406                    device.getDeviceClass(),
407                    device.getDeviceSubClass(),
408                    device.getDeviceProtocol(),
409                ) == USB_BT_HCI_CLASS_TUPLE:
410                    return True
411
412                # If the device class is 'Device', look for a matching interface
413                if device.getDeviceClass() == USB_DEVICE_CLASS_DEVICE:
414                    for configuration in device:
415                        for interface in configuration:
416                            for setting in interface:
417                                if (
418                                    setting.getClass(),
419                                    setting.getSubClass(),
420                                    setting.getProtocol(),
421                                ) == USB_BT_HCI_CLASS_TUPLE:
422                                    return True
423
424                return False
425
426            device_index = int(spec)
427            for device in context.getDeviceIterator(skip_on_error=True):
428                if device_is_bluetooth_hci(device):
429                    if device_index == 0:
430                        found = device
431                        break
432                    device_index -= 1
433                device.close()
434
435        if found is None:
436            context.close()
437            raise ValueError('device not found')
438
439        logger.debug(f'USB Device: {found}')
440
441        # Look for the first interface with the right class and endpoints
442        def find_endpoints(device):
443            # pylint: disable-next=too-many-nested-blocks
444            for (configuration_index, configuration) in enumerate(device):
445                interface = None
446                for interface in configuration:
447                    setting = None
448                    for setting in interface:
449                        if (
450                            not forced_mode
451                            and (
452                                setting.getClass(),
453                                setting.getSubClass(),
454                                setting.getProtocol(),
455                            )
456                            != USB_BT_HCI_CLASS_TUPLE
457                        ):
458                            continue
459
460                        events_in = None
461                        acl_in = None
462                        acl_out = None
463                        for endpoint in setting:
464                            attributes = endpoint.getAttributes()
465                            address = endpoint.getAddress()
466                            if attributes & 0x03 == USB_ENDPOINT_TRANSFER_TYPE_BULK:
467                                if address & USB_ENDPOINT_IN and acl_in is None:
468                                    acl_in = address
469                                elif acl_out is None:
470                                    acl_out = address
471                            elif (
472                                attributes & 0x03
473                                == USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT
474                            ):
475                                if address & USB_ENDPOINT_IN and events_in is None:
476                                    events_in = address
477
478                        # Return if we found all 3 endpoints
479                        if (
480                            acl_in is not None
481                            and acl_out is not None
482                            and events_in is not None
483                        ):
484                            return (
485                                configuration_index + 1,
486                                setting.getNumber(),
487                                setting.getAlternateSetting(),
488                                acl_in,
489                                acl_out,
490                                events_in,
491                            )
492
493                        logger.debug(
494                            f'skipping configuration {configuration_index + 1} / '
495                            f'interface {setting.getNumber()}'
496                        )
497
498            return None
499
500        endpoints = find_endpoints(found)
501        if endpoints is None:
502            raise ValueError('no compatible interface found for device')
503        (configuration, interface, setting, acl_in, acl_out, events_in) = endpoints
504        logger.debug(
505            f'selected endpoints: configuration={configuration}, '
506            f'interface={interface}, '
507            f'setting={setting}, '
508            f'acl_in=0x{acl_in:02X}, '
509            f'acl_out=0x{acl_out:02X}, '
510            f'events_in=0x{events_in:02X}, '
511        )
512
513        device = found.open()
514
515        # Auto-detach the kernel driver if supported
516        # pylint: disable=no-member
517        if usb1.hasCapability(usb1.CAP_SUPPORTS_DETACH_KERNEL_DRIVER):
518            try:
519                logger.debug('auto-detaching kernel driver')
520                device.setAutoDetachKernelDriver(True)
521            except usb1.USBError as error:
522                logger.warning(f'unable to auto-detach kernel driver: {error}')
523
524        # Set the configuration if needed
525        try:
526            current_configuration = device.getConfiguration()
527            logger.debug(f'current configuration = {current_configuration}')
528        except usb1.USBError:
529            current_configuration = 0
530
531        if current_configuration != configuration:
532            try:
533                logger.debug(f'setting configuration {configuration}')
534                device.setConfiguration(configuration)
535            except usb1.USBError:
536                logger.warning('failed to set configuration')
537
538        source = UsbPacketSource(context, device, acl_in, events_in)
539        sink = UsbPacketSink(device, acl_out)
540        return UsbTransport(context, device, interface, setting, source, sink)
541    except usb1.USBError as error:
542        logger.warning(color(f'!!! failed to open USB device: {error}', 'red'))
543        context.close()
544        raise
545