• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import logging
20import threading
21import collections
22import ctypes
23import platform
24
25import usb1
26
27from bumble.transport.common import Transport, ParserSource
28from bumble import hci
29from bumble.colors import color
30from bumble.utils import AsyncRunner
31
32
33# -----------------------------------------------------------------------------
34# Logging
35# -----------------------------------------------------------------------------
36logger = logging.getLogger(__name__)
37
38
39# -----------------------------------------------------------------------------
40def load_libusb():
41    '''
42    Attempt to load the libusb-1.0 C library from libusb_package in site-packages.
43    If the library exists, we create a DLL object and initialize the usb1 backend.
44    This only needs to be done once, but before a usb1.USBContext is created.
45    If the library does not exists, do nothing and usb1 will search default system paths
46    when usb1.USBContext is created.
47    '''
48    try:
49        import libusb_package
50    except ImportError:
51        logger.debug('libusb_package is not available')
52    else:
53        if libusb_path := libusb_package.get_library_path():
54            logger.debug(f'loading libusb library at {libusb_path}')
55            dll_loader = (
56                ctypes.WinDLL if platform.system() == 'Windows' else ctypes.CDLL
57            )
58            libusb_dll = dll_loader(
59                str(libusb_path), use_errno=True, use_last_error=True
60            )
61            usb1.loadLibrary(libusb_dll)
62
63
64async def open_usb_transport(spec: str) -> Transport:
65    '''
66    Open a USB transport.
67    The moniker string has this syntax:
68    either <index> or
69    <vendor>:<product> or
70    <vendor>:<product>/<serial-number>] or
71    <vendor>:<product>#<index>
72    With <index> as the 0-based index to select amongst all the devices that appear
73    to be supporting Bluetooth HCI (0 being the first one), or
74    Where <vendor> and <product> are the vendor ID and product ID in hexadecimal. The
75    /<serial-number> suffix or #<index> suffix max be specified when more than one
76    device with the same vendor and product identifiers are present.
77
78    In addition, if the moniker ends with the symbol "!", the device will be used in
79    "forced" mode:
80    the first USB interface of the device will be used, regardless of the interface
81    class/subclass.
82    This may be useful for some devices that use a custom class/subclass but may
83    nonetheless work as-is.
84
85    Examples:
86    0 --> the first BT USB dongle
87    04b4:f901 --> the BT USB dongle with vendor=04b4 and product=f901
88    04b4:f901#2 --> the third USB device with vendor=04b4 and product=f901
89    04b4:f901/00E04C239987 --> the BT USB dongle with vendor=04b4 and product=f901 and
90    serial number 00E04C239987
91    usb:0B05:17CB! --> the BT USB dongle vendor=0B05 and product=17CB, in "forced" mode.
92    '''
93
94    # pylint: disable=invalid-name
95    USB_RECIPIENT_DEVICE = 0x00
96    USB_REQUEST_TYPE_CLASS = 0x01 << 5
97    USB_DEVICE_CLASS_DEVICE = 0x00
98    USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0
99    USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01
100    USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01
101    USB_ENDPOINT_TRANSFER_TYPE_BULK = 0x02
102    USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT = 0x03
103    USB_ENDPOINT_IN = 0x80
104
105    USB_BT_HCI_CLASS_TUPLE = (
106        USB_DEVICE_CLASS_WIRELESS_CONTROLLER,
107        USB_DEVICE_SUBCLASS_RF_CONTROLLER,
108        USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER,
109    )
110
111    READ_SIZE = 4096
112
113    class UsbPacketSink:
114        def __init__(self, device, acl_out):
115            self.device = device
116            self.acl_out = acl_out
117            self.acl_out_transfer = device.getTransfer()
118            self.packets = collections.deque()  # Queue of packets waiting to be sent
119            self.loop = asyncio.get_running_loop()
120            self.cancel_done = self.loop.create_future()
121            self.closed = False
122
123        def start(self):
124            pass
125
126        def on_packet(self, packet):
127            # Ignore packets if we're closed
128            if self.closed:
129                return
130
131            if len(packet) == 0:
132                logger.warning('packet too short')
133                return
134
135            # Queue the packet
136            self.packets.append(packet)
137            if len(self.packets) == 1:
138                # The queue was previously empty, re-prime the pump
139                self.process_queue()
140
141        def transfer_callback(self, transfer):
142            status = transfer.getStatus()
143
144            # pylint: disable=no-member
145            if status == usb1.TRANSFER_COMPLETED:
146                self.loop.call_soon_threadsafe(self.on_packet_sent)
147            elif status == usb1.TRANSFER_CANCELLED:
148                self.loop.call_soon_threadsafe(self.cancel_done.set_result, None)
149            else:
150                logger.warning(
151                    color(f'!!! OUT transfer not completed: status={status}', 'red')
152                )
153
154        def on_packet_sent(self):
155            if self.packets:
156                self.packets.popleft()
157                self.process_queue()
158
159        def process_queue(self):
160            if len(self.packets) == 0:
161                return  # Nothing to do
162
163            packet = self.packets[0]
164            packet_type = packet[0]
165            if packet_type == hci.HCI_ACL_DATA_PACKET:
166                self.acl_out_transfer.setBulk(
167                    self.acl_out, packet[1:], callback=self.transfer_callback
168                )
169                self.acl_out_transfer.submit()
170            elif packet_type == hci.HCI_COMMAND_PACKET:
171                self.acl_out_transfer.setControl(
172                    USB_RECIPIENT_DEVICE | USB_REQUEST_TYPE_CLASS,
173                    0,
174                    0,
175                    0,
176                    packet[1:],
177                    callback=self.transfer_callback,
178                )
179                self.acl_out_transfer.submit()
180            else:
181                logger.warning(color(f'unsupported packet type {packet_type}', 'red'))
182
183        def close(self):
184            self.closed = True
185
186        async def terminate(self):
187            if not self.closed:
188                self.close()
189
190            # Empty the packet queue so that we don't send any more data
191            self.packets.clear()
192
193            # If we have a transfer in flight, cancel it
194            if self.acl_out_transfer.isSubmitted():
195                # Try to cancel the transfer, but that may fail because it may have
196                # already completed
197                try:
198                    self.acl_out_transfer.cancel()
199
200                    logger.debug('waiting for OUT transfer cancellation to be done...')
201                    await self.cancel_done
202                    logger.debug('OUT transfer cancellation done')
203                except usb1.USBError:
204                    logger.debug('OUT transfer likely already completed')
205
206    class UsbPacketSource(asyncio.Protocol, ParserSource):
207        def __init__(self, device, metadata, acl_in, events_in):
208            super().__init__()
209            self.device = device
210            self.metadata = metadata
211            self.acl_in = acl_in
212            self.acl_in_transfer = None
213            self.events_in = events_in
214            self.events_in_transfer = None
215            self.loop = asyncio.get_running_loop()
216            self.queue = asyncio.Queue()
217            self.dequeue_task = None
218            self.cancel_done = {
219                hci.HCI_EVENT_PACKET: self.loop.create_future(),
220                hci.HCI_ACL_DATA_PACKET: self.loop.create_future(),
221            }
222            self.closed = False
223
224        def start(self):
225            # Set up transfer objects for input
226            self.events_in_transfer = device.getTransfer()
227            self.events_in_transfer.setInterrupt(
228                self.events_in,
229                READ_SIZE,
230                callback=self.transfer_callback,
231                user_data=hci.HCI_EVENT_PACKET,
232            )
233            self.events_in_transfer.submit()
234
235            self.acl_in_transfer = device.getTransfer()
236            self.acl_in_transfer.setBulk(
237                self.acl_in,
238                READ_SIZE,
239                callback=self.transfer_callback,
240                user_data=hci.HCI_ACL_DATA_PACKET,
241            )
242            self.acl_in_transfer.submit()
243
244            self.dequeue_task = self.loop.create_task(self.dequeue())
245
246        @property
247        def usb_transfer_submitted(self):
248            return (
249                self.events_in_transfer.isSubmitted()
250                or self.acl_in_transfer.isSubmitted()
251            )
252
253        def transfer_callback(self, transfer):
254            packet_type = transfer.getUserData()
255            status = transfer.getStatus()
256
257            # pylint: disable=no-member
258            if status == usb1.TRANSFER_COMPLETED:
259                packet = (
260                    bytes([packet_type])
261                    + transfer.getBuffer()[: transfer.getActualLength()]
262                )
263                self.loop.call_soon_threadsafe(self.queue.put_nowait, packet)
264
265                # Re-submit the transfer so we can receive more data
266                transfer.submit()
267            elif status == usb1.TRANSFER_CANCELLED:
268                self.loop.call_soon_threadsafe(
269                    self.cancel_done[packet_type].set_result, None
270                )
271            else:
272                logger.warning(
273                    color(f'!!! IN transfer not completed: status={status}', 'red')
274                )
275                self.loop.call_soon_threadsafe(self.on_transport_lost)
276
277        async def dequeue(self):
278            while not self.closed:
279                try:
280                    packet = await self.queue.get()
281                except asyncio.CancelledError:
282                    return
283                self.parser.feed_data(packet)
284
285        def close(self):
286            self.closed = True
287
288        async def terminate(self):
289            if not self.closed:
290                self.close()
291
292            self.dequeue_task.cancel()
293
294            # Cancel the transfers
295            for transfer in (self.events_in_transfer, self.acl_in_transfer):
296                if transfer.isSubmitted():
297                    # Try to cancel the transfer, but that may fail because it may have
298                    # already completed
299                    packet_type = transfer.getUserData()
300                    try:
301                        transfer.cancel()
302                        logger.debug(
303                            f'waiting for IN[{packet_type}] transfer cancellation '
304                            'to be done...'
305                        )
306                        await self.cancel_done[packet_type]
307                        logger.debug(f'IN[{packet_type}] transfer cancellation done')
308                    except usb1.USBError:
309                        logger.debug(
310                            f'IN[{packet_type}] transfer likely already completed'
311                        )
312
313    class UsbTransport(Transport):
314        def __init__(self, context, device, interface, setting, source, sink):
315            super().__init__(source, sink)
316            self.context = context
317            self.device = device
318            self.interface = interface
319            self.loop = asyncio.get_running_loop()
320            self.event_loop_done = self.loop.create_future()
321
322            # Get exclusive access
323            device.claimInterface(interface)
324
325            # Set the alternate setting if not the default
326            if setting != 0:
327                device.setInterfaceAltSetting(interface, setting)
328
329            # The source and sink can now start
330            source.start()
331            sink.start()
332
333            # Create a thread to process events
334            self.event_thread = threading.Thread(target=self.run)
335            self.event_thread.start()
336
337        def run(self):
338            logger.debug('starting USB event loop')
339            while self.source.usb_transfer_submitted:
340                # pylint: disable=no-member
341                try:
342                    self.context.handleEvents()
343                except usb1.USBErrorInterrupted:
344                    pass
345
346            logger.debug('USB event loop done')
347            self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
348
349        async def close(self):
350            self.source.close()
351            self.sink.close()
352            await self.source.terminate()
353            await self.sink.terminate()
354            self.device.releaseInterface(self.interface)
355            self.device.close()
356            self.context.close()
357
358            # Wait for the thread to terminate
359            await self.event_loop_done
360
361    # Find the device according to the spec moniker
362    load_libusb()
363    context = usb1.USBContext()
364    context.open()
365    try:
366        found = None
367
368        if spec.endswith('!'):
369            spec = spec[:-1]
370            forced_mode = True
371        else:
372            forced_mode = False
373
374        if ':' in spec:
375            vendor_id, product_id = spec.split(':')
376            serial_number = None
377            device_index = 0
378            if '/' in product_id:
379                product_id, serial_number = product_id.split('/')
380            elif '#' in product_id:
381                product_id, device_index_str = product_id.split('#')
382                device_index = int(device_index_str)
383
384            for device in context.getDeviceIterator(skip_on_error=True):
385                try:
386                    device_serial_number = device.getSerialNumber()
387                except usb1.USBError:
388                    device_serial_number = None
389                if (
390                    device.getVendorID() == int(vendor_id, 16)
391                    and device.getProductID() == int(product_id, 16)
392                    and (serial_number is None or serial_number == device_serial_number)
393                ):
394                    if device_index == 0:
395                        found = device
396                        break
397                    device_index -= 1
398                device.close()
399        elif '-' in spec:
400
401            def device_path(device):
402                return f'{device.getBusNumber()}-{".".join(map(str, device.getPortNumberList()))}'
403
404            for device in context.getDeviceIterator(skip_on_error=True):
405                if device_path(device) == spec:
406                    found = device
407                    break
408                device.close()
409        else:
410            # Look for a compatible device by index
411            def device_is_bluetooth_hci(device):
412                # Check if the device class indicates a match
413                if (
414                    device.getDeviceClass(),
415                    device.getDeviceSubClass(),
416                    device.getDeviceProtocol(),
417                ) == USB_BT_HCI_CLASS_TUPLE:
418                    return True
419
420                # If the device class is 'Device', look for a matching interface
421                if device.getDeviceClass() == USB_DEVICE_CLASS_DEVICE:
422                    for configuration in device:
423                        for interface in configuration:
424                            for setting in interface:
425                                if (
426                                    setting.getClass(),
427                                    setting.getSubClass(),
428                                    setting.getProtocol(),
429                                ) == USB_BT_HCI_CLASS_TUPLE:
430                                    return True
431
432                return False
433
434            device_index = int(spec)
435            for device in context.getDeviceIterator(skip_on_error=True):
436                if device_is_bluetooth_hci(device):
437                    if device_index == 0:
438                        found = device
439                        break
440                    device_index -= 1
441                device.close()
442
443        if found is None:
444            context.close()
445            raise ValueError('device not found')
446
447        logger.debug(f'USB Device: {found}')
448
449        # Look for the first interface with the right class and endpoints
450        def find_endpoints(device):
451            # pylint: disable-next=too-many-nested-blocks
452            for configuration_index, configuration in enumerate(device):
453                interface = None
454                for interface in configuration:
455                    setting = None
456                    for setting in interface:
457                        if (
458                            not forced_mode
459                            and (
460                                setting.getClass(),
461                                setting.getSubClass(),
462                                setting.getProtocol(),
463                            )
464                            != USB_BT_HCI_CLASS_TUPLE
465                        ):
466                            continue
467
468                        events_in = None
469                        acl_in = None
470                        acl_out = None
471                        for endpoint in setting:
472                            attributes = endpoint.getAttributes()
473                            address = endpoint.getAddress()
474                            if attributes & 0x03 == USB_ENDPOINT_TRANSFER_TYPE_BULK:
475                                if address & USB_ENDPOINT_IN and acl_in is None:
476                                    acl_in = address
477                                elif acl_out is None:
478                                    acl_out = address
479                            elif (
480                                attributes & 0x03
481                                == USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT
482                            ):
483                                if address & USB_ENDPOINT_IN and events_in is None:
484                                    events_in = address
485
486                        # Return if we found all 3 endpoints
487                        if (
488                            acl_in is not None
489                            and acl_out is not None
490                            and events_in is not None
491                        ):
492                            return (
493                                configuration_index + 1,
494                                setting.getNumber(),
495                                setting.getAlternateSetting(),
496                                acl_in,
497                                acl_out,
498                                events_in,
499                            )
500
501                        logger.debug(
502                            f'skipping configuration {configuration_index + 1} / '
503                            f'interface {setting.getNumber()}'
504                        )
505
506            return None
507
508        endpoints = find_endpoints(found)
509        if endpoints is None:
510            raise ValueError('no compatible interface found for device')
511        (configuration, interface, setting, acl_in, acl_out, events_in) = endpoints
512        logger.debug(
513            f'selected endpoints: configuration={configuration}, '
514            f'interface={interface}, '
515            f'setting={setting}, '
516            f'acl_in=0x{acl_in:02X}, '
517            f'acl_out=0x{acl_out:02X}, '
518            f'events_in=0x{events_in:02X}, '
519        )
520
521        device_metadata = {
522            'vendor_id': found.getVendorID(),
523            'product_id': found.getProductID(),
524        }
525        device = found.open()
526
527        # Auto-detach the kernel driver if supported
528        # pylint: disable=no-member
529        if usb1.hasCapability(usb1.CAP_SUPPORTS_DETACH_KERNEL_DRIVER):
530            try:
531                logger.debug('auto-detaching kernel driver')
532                device.setAutoDetachKernelDriver(True)
533            except usb1.USBError as error:
534                logger.warning(f'unable to auto-detach kernel driver: {error}')
535
536        # Set the configuration if needed
537        try:
538            current_configuration = device.getConfiguration()
539            logger.debug(f'current configuration = {current_configuration}')
540        except usb1.USBError:
541            current_configuration = 0
542
543        if current_configuration != configuration:
544            try:
545                logger.debug(f'setting configuration {configuration}')
546                device.setConfiguration(configuration)
547            except usb1.USBError:
548                logger.warning('failed to set configuration')
549
550        source = UsbPacketSource(device, device_metadata, acl_in, events_in)
551        sink = UsbPacketSink(device, acl_out)
552        return UsbTransport(context, device, interface, setting, source, sink)
553    except usb1.USBError as error:
554        logger.warning(color(f'!!! failed to open USB device: {error}', 'red'))
555        context.close()
556        raise
557