• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15Support for Realtek USB dongles.
16Based on various online bits of information, including the Linux kernel.
17(see `drivers/bluetooth/btrtl.c`)
18"""
19
20# -----------------------------------------------------------------------------
21# Imports
22# -----------------------------------------------------------------------------
23from dataclasses import dataclass
24import asyncio
25import enum
26import logging
27import math
28import os
29import pathlib
30import platform
31import struct
32from typing import Tuple
33import weakref
34
35
36from bumble.hci import (
37    hci_vendor_command_op_code,
38    STATUS_SPEC,
39    HCI_SUCCESS,
40    HCI_Command,
41    HCI_Reset_Command,
42    HCI_Read_Local_Version_Information_Command,
43)
44from bumble.drivers import common
45
46# -----------------------------------------------------------------------------
47# Logging
48# -----------------------------------------------------------------------------
49logger = logging.getLogger(__name__)
50
51
52# -----------------------------------------------------------------------------
53# Constants
54# -----------------------------------------------------------------------------
55RTK_ROM_LMP_8723A = 0x1200
56RTK_ROM_LMP_8723B = 0x8723
57RTK_ROM_LMP_8821A = 0x8821
58RTK_ROM_LMP_8761A = 0x8761
59RTK_ROM_LMP_8822B = 0x8822
60RTK_ROM_LMP_8852A = 0x8852
61RTK_CONFIG_MAGIC = 0x8723AB55
62
63RTK_EPATCH_SIGNATURE = b"Realtech"
64
65RTK_FRAGMENT_LENGTH = 252
66
67RTK_FIRMWARE_DIR_ENV = "BUMBLE_RTK_FIRMWARE_DIR"
68RTK_LINUX_FIRMWARE_DIR = "/lib/firmware/rtl_bt"
69
70
71class RtlProjectId(enum.IntEnum):
72    PROJECT_ID_8723A = 0
73    PROJECT_ID_8723B = 1
74    PROJECT_ID_8821A = 2
75    PROJECT_ID_8761A = 3
76    PROJECT_ID_8822B = 8
77    PROJECT_ID_8723D = 9
78    PROJECT_ID_8821C = 10
79    PROJECT_ID_8822C = 13
80    PROJECT_ID_8761B = 14
81    PROJECT_ID_8852A = 18
82    PROJECT_ID_8852B = 20
83    PROJECT_ID_8852C = 25
84
85
86RTK_PROJECT_ID_TO_ROM = {
87    0: RTK_ROM_LMP_8723A,
88    1: RTK_ROM_LMP_8723B,
89    2: RTK_ROM_LMP_8821A,
90    3: RTK_ROM_LMP_8761A,
91    8: RTK_ROM_LMP_8822B,
92    9: RTK_ROM_LMP_8723B,
93    10: RTK_ROM_LMP_8821A,
94    13: RTK_ROM_LMP_8822B,
95    14: RTK_ROM_LMP_8761A,
96    18: RTK_ROM_LMP_8852A,
97    20: RTK_ROM_LMP_8852A,
98    25: RTK_ROM_LMP_8852A,
99}
100
101# List of USB (VendorID, ProductID) for Realtek-based devices.
102RTK_USB_PRODUCTS = {
103    # Realtek 8723AE
104    (0x0930, 0x021D),
105    (0x13D3, 0x3394),
106    # Realtek 8723BE
107    (0x0489, 0xE085),
108    (0x0489, 0xE08B),
109    (0x04F2, 0xB49F),
110    (0x13D3, 0x3410),
111    (0x13D3, 0x3416),
112    (0x13D3, 0x3459),
113    (0x13D3, 0x3494),
114    # Realtek 8723BU
115    (0x7392, 0xA611),
116    # Realtek 8723DE
117    (0x0BDA, 0xB009),
118    (0x2FF8, 0xB011),
119    # Realtek 8761BUV
120    (0x0B05, 0x190E),
121    (0x0BDA, 0x8771),
122    (0x2230, 0x0016),
123    (0x2357, 0x0604),
124    (0x2550, 0x8761),
125    (0x2B89, 0x8761),
126    (0x7392, 0xC611),
127    (0x0BDA, 0x877B),
128    # Realtek 8821AE
129    (0x0B05, 0x17DC),
130    (0x13D3, 0x3414),
131    (0x13D3, 0x3458),
132    (0x13D3, 0x3461),
133    (0x13D3, 0x3462),
134    # Realtek 8821CE
135    (0x0BDA, 0xB00C),
136    (0x0BDA, 0xC822),
137    (0x13D3, 0x3529),
138    # Realtek 8822BE
139    (0x0B05, 0x185C),
140    (0x13D3, 0x3526),
141    # Realtek 8822CE
142    (0x04C5, 0x161F),
143    (0x04CA, 0x4005),
144    (0x0B05, 0x18EF),
145    (0x0BDA, 0xB00C),
146    (0x0BDA, 0xC123),
147    (0x0BDA, 0xC822),
148    (0x0CB5, 0xC547),
149    (0x1358, 0xC123),
150    (0x13D3, 0x3548),
151    (0x13D3, 0x3549),
152    (0x13D3, 0x3553),
153    (0x13D3, 0x3555),
154    (0x2FF8, 0x3051),
155    # Realtek 8822CU
156    (0x13D3, 0x3549),
157    # Realtek 8852AE
158    (0x04C5, 0x165C),
159    (0x04CA, 0x4006),
160    (0x0BDA, 0x2852),
161    (0x0BDA, 0x385A),
162    (0x0BDA, 0x4852),
163    (0x0BDA, 0xC852),
164    (0x0CB8, 0xC549),
165    # Realtek 8852BE
166    (0x0BDA, 0x887B),
167    (0x0CB8, 0xC559),
168    (0x13D3, 0x3571),
169    # Realtek 8852CE
170    (0x04C5, 0x1675),
171    (0x04CA, 0x4007),
172    (0x0CB8, 0xC558),
173    (0x13D3, 0x3586),
174    (0x13D3, 0x3587),
175    (0x13D3, 0x3592),
176}
177
178# -----------------------------------------------------------------------------
179# HCI Commands
180# -----------------------------------------------------------------------------
181HCI_RTK_READ_ROM_VERSION_COMMAND = hci_vendor_command_op_code(0x6D)
182HCI_RTK_DOWNLOAD_COMMAND = hci_vendor_command_op_code(0x20)
183HCI_RTK_DROP_FIRMWARE_COMMAND = hci_vendor_command_op_code(0x66)
184HCI_Command.register_commands(globals())
185
186
187@HCI_Command.command(return_parameters_fields=[("status", STATUS_SPEC), ("version", 1)])
188class HCI_RTK_Read_ROM_Version_Command(HCI_Command):
189    pass
190
191
192@HCI_Command.command(
193    fields=[("index", 1), ("payload", RTK_FRAGMENT_LENGTH)],
194    return_parameters_fields=[("status", STATUS_SPEC), ("index", 1)],
195)
196class HCI_RTK_Download_Command(HCI_Command):
197    pass
198
199
200@HCI_Command.command()
201class HCI_RTK_Drop_Firmware_Command(HCI_Command):
202    pass
203
204
205# -----------------------------------------------------------------------------
206class Firmware:
207    def __init__(self, firmware):
208        extension_sig = bytes([0x51, 0x04, 0xFD, 0x77])
209
210        if not firmware.startswith(RTK_EPATCH_SIGNATURE):
211            raise ValueError("Firmware does not start with epatch signature")
212
213        if not firmware.endswith(extension_sig):
214            raise ValueError("Firmware does not end with extension sig")
215
216        # The firmware should start with a 14 byte header.
217        epatch_header_size = 14
218        if len(firmware) < epatch_header_size:
219            raise ValueError("Firmware too short")
220
221        # Look for the "project ID", starting from the end.
222        offset = len(firmware) - len(extension_sig)
223        project_id = -1
224        while offset >= epatch_header_size:
225            length, opcode = firmware[offset - 2 : offset]
226            offset -= 2
227
228            if opcode == 0xFF:
229                # End
230                break
231
232            if length == 0:
233                raise ValueError("Invalid 0-length instruction")
234
235            if opcode == 0 and length == 1:
236                project_id = firmware[offset - 1]
237                break
238
239            offset -= length
240
241        if project_id < 0:
242            raise ValueError("Project ID not found")
243
244        self.project_id = project_id
245
246        # Read the patch tables info.
247        self.version, num_patches = struct.unpack("<IH", firmware[8:14])
248        self.patches = []
249
250        # The patches tables are laid out as:
251        # <ChipID_1><ChipID_2>...<ChipID_N>  (16 bits each)
252        # <PatchLength_1><PatchLength_2>...<PatchLength_N> (16 bits each)
253        # <PatchOffset_1><PatchOffset_2>...<PatchOffset_N> (32 bits each)
254        if epatch_header_size + 8 * num_patches > len(firmware):
255            raise ValueError("Firmware too short")
256        chip_id_table_offset = epatch_header_size
257        patch_length_table_offset = chip_id_table_offset + 2 * num_patches
258        patch_offset_table_offset = chip_id_table_offset + 4 * num_patches
259        for patch_index in range(num_patches):
260            chip_id_offset = chip_id_table_offset + 2 * patch_index
261            (chip_id,) = struct.unpack_from("<H", firmware, chip_id_offset)
262            (patch_length,) = struct.unpack_from(
263                "<H", firmware, patch_length_table_offset + 2 * patch_index
264            )
265            (patch_offset,) = struct.unpack_from(
266                "<I", firmware, patch_offset_table_offset + 4 * patch_index
267            )
268            if patch_offset + patch_length > len(firmware):
269                raise ValueError("Firmware too short")
270
271            # Get the SVN version for the patch
272            (svn_version,) = struct.unpack_from(
273                "<I", firmware, patch_offset + patch_length - 8
274            )
275
276            # Create a payload with the patch, replacing the last 4 bytes with
277            # the firmware version.
278            self.patches.append(
279                (
280                    chip_id,
281                    firmware[patch_offset : patch_offset + patch_length - 4]
282                    + struct.pack("<I", self.version),
283                    svn_version,
284                )
285            )
286
287
288class Driver(common.Driver):
289    @dataclass
290    class DriverInfo:
291        rom: int
292        hci: Tuple[int, int]
293        config_needed: bool
294        has_rom_version: bool
295        has_msft_ext: bool = False
296        fw_name: str = ""
297        config_name: str = ""
298
299    DRIVER_INFOS = [
300        # 8723A
301        DriverInfo(
302            rom=RTK_ROM_LMP_8723A,
303            hci=(0x0B, 0x06),
304            config_needed=False,
305            has_rom_version=False,
306            fw_name="rtl8723a_fw.bin",
307            config_name="",
308        ),
309        # 8723B
310        DriverInfo(
311            rom=RTK_ROM_LMP_8723B,
312            hci=(0x0B, 0x06),
313            config_needed=False,
314            has_rom_version=True,
315            fw_name="rtl8723b_fw.bin",
316            config_name="rtl8723b_config.bin",
317        ),
318        # 8723D
319        DriverInfo(
320            rom=RTK_ROM_LMP_8723B,
321            hci=(0x0D, 0x08),
322            config_needed=True,
323            has_rom_version=True,
324            fw_name="rtl8723d_fw.bin",
325            config_name="rtl8723d_config.bin",
326        ),
327        # 8821A
328        DriverInfo(
329            rom=RTK_ROM_LMP_8821A,
330            hci=(0x0A, 0x06),
331            config_needed=False,
332            has_rom_version=True,
333            fw_name="rtl8821a_fw.bin",
334            config_name="rtl8821a_config.bin",
335        ),
336        # 8821C
337        DriverInfo(
338            rom=RTK_ROM_LMP_8821A,
339            hci=(0x0C, 0x08),
340            config_needed=False,
341            has_rom_version=True,
342            has_msft_ext=True,
343            fw_name="rtl8821c_fw.bin",
344            config_name="rtl8821c_config.bin",
345        ),
346        # 8761A
347        DriverInfo(
348            rom=RTK_ROM_LMP_8761A,
349            hci=(0x0A, 0x06),
350            config_needed=False,
351            has_rom_version=True,
352            fw_name="rtl8761a_fw.bin",
353            config_name="rtl8761a_config.bin",
354        ),
355        # 8761BU
356        DriverInfo(
357            rom=RTK_ROM_LMP_8761A,
358            hci=(0x0B, 0x0A),
359            config_needed=False,
360            has_rom_version=True,
361            fw_name="rtl8761bu_fw.bin",
362            config_name="rtl8761bu_config.bin",
363        ),
364        # 8822C
365        DriverInfo(
366            rom=RTK_ROM_LMP_8822B,
367            hci=(0x0C, 0x0A),
368            config_needed=False,
369            has_rom_version=True,
370            has_msft_ext=True,
371            fw_name="rtl8822cu_fw.bin",
372            config_name="rtl8822cu_config.bin",
373        ),
374        # 8822B
375        DriverInfo(
376            rom=RTK_ROM_LMP_8822B,
377            hci=(0x0B, 0x07),
378            config_needed=True,
379            has_rom_version=True,
380            has_msft_ext=True,
381            fw_name="rtl8822b_fw.bin",
382            config_name="rtl8822b_config.bin",
383        ),
384        # 8852A
385        DriverInfo(
386            rom=RTK_ROM_LMP_8852A,
387            hci=(0x0A, 0x0B),
388            config_needed=False,
389            has_rom_version=True,
390            has_msft_ext=True,
391            fw_name="rtl8852au_fw.bin",
392            config_name="rtl8852au_config.bin",
393        ),
394        # 8852B
395        DriverInfo(
396            rom=RTK_ROM_LMP_8852A,
397            hci=(0xB, 0xB),
398            config_needed=False,
399            has_rom_version=True,
400            has_msft_ext=True,
401            fw_name="rtl8852bu_fw.bin",
402            config_name="rtl8852bu_config.bin",
403        ),
404        # 8852C
405        DriverInfo(
406            rom=RTK_ROM_LMP_8852A,
407            hci=(0x0C, 0x0C),
408            config_needed=False,
409            has_rom_version=True,
410            has_msft_ext=True,
411            fw_name="rtl8852cu_fw.bin",
412            config_name="rtl8852cu_config.bin",
413        ),
414    ]
415
416    POST_DROP_DELAY = 0.2
417
418    @staticmethod
419    def find_driver_info(hci_version, hci_subversion, lmp_subversion):
420        for driver_info in Driver.DRIVER_INFOS:
421            if driver_info.rom == lmp_subversion and driver_info.hci == (
422                hci_subversion,
423                hci_version,
424            ):
425                return driver_info
426
427        return None
428
429    @staticmethod
430    def find_binary_path(file_name):
431        # First check if an environment variable is set
432        if RTK_FIRMWARE_DIR_ENV in os.environ:
433            if (
434                path := pathlib.Path(os.environ[RTK_FIRMWARE_DIR_ENV]) / file_name
435            ).is_file():
436                logger.debug(f"{file_name} found in env dir")
437                return path
438
439            # When the environment variable is set, don't look elsewhere
440            return None
441
442        # Then, look where the firmware download tool writes by default
443        if (path := rtk_firmware_dir() / file_name).is_file():
444            logger.debug(f"{file_name} found in project data dir")
445            return path
446
447        # Then, look in the package's driver directory
448        if (path := pathlib.Path(__file__).parent / "rtk_fw" / file_name).is_file():
449            logger.debug(f"{file_name} found in package dir")
450            return path
451
452        # On Linux, check the system's FW directory
453        if (
454            platform.system() == "Linux"
455            and (path := pathlib.Path(RTK_LINUX_FIRMWARE_DIR) / file_name).is_file()
456        ):
457            logger.debug(f"{file_name} found in Linux system FW dir")
458            return path
459
460        # Finally look in the current directory
461        if (path := pathlib.Path.cwd() / file_name).is_file():
462            logger.debug(f"{file_name} found in CWD")
463            return path
464
465        return None
466
467    @staticmethod
468    def check(host):
469        if not host.hci_metadata:
470            logger.debug("USB metadata not found")
471            return False
472
473        if host.hci_metadata.get('driver') == 'rtk':
474            # Forced driver
475            return True
476
477        vendor_id = host.hci_metadata.get("vendor_id")
478        product_id = host.hci_metadata.get("product_id")
479        if vendor_id is None or product_id is None:
480            logger.debug("USB metadata not sufficient")
481            return False
482
483        if (vendor_id, product_id) not in RTK_USB_PRODUCTS:
484            logger.debug(
485                f"USB device ({vendor_id:04X}, {product_id:04X}) " "not in known list"
486            )
487            return False
488
489        return True
490
491    @classmethod
492    async def driver_info_for_host(cls, host):
493        await host.send_command(HCI_Reset_Command(), check_result=True)
494        host.ready = True  # Needed to let the host know the controller is ready.
495
496        response = await host.send_command(
497            HCI_Read_Local_Version_Information_Command(), check_result=True
498        )
499        local_version = response.return_parameters
500
501        logger.debug(
502            f"looking for a driver: 0x{local_version.lmp_subversion:04X} "
503            f"(0x{local_version.hci_version:02X}, "
504            f"0x{local_version.hci_subversion:04X})"
505        )
506
507        driver_info = cls.find_driver_info(
508            local_version.hci_version,
509            local_version.hci_subversion,
510            local_version.lmp_subversion,
511        )
512        if driver_info is None:
513            # TODO: it seems that the Linux driver will send command (0x3f, 0x66)
514            # in this case and then re-read the local version, then re-match.
515            logger.debug("firmware already loaded or no known driver for this device")
516
517        return driver_info
518
519    @classmethod
520    async def for_host(cls, host, force=False):
521        # Check that a driver is needed for this host
522        if not force and not cls.check(host):
523            return None
524
525        # Get the driver info
526        driver_info = await cls.driver_info_for_host(host)
527        if driver_info is None:
528            return None
529
530        # Load the firmware
531        firmware_path = cls.find_binary_path(driver_info.fw_name)
532        if not firmware_path:
533            logger.warning(f"Firmware file {driver_info.fw_name} not found")
534            logger.warning("See https://google.github.io/bumble/drivers/realtek.html")
535            return None
536        with open(firmware_path, "rb") as firmware_file:
537            firmware = firmware_file.read()
538
539        # Load the config
540        config = None
541        if driver_info.config_name:
542            config_path = cls.find_binary_path(driver_info.config_name)
543            if config_path:
544                with open(config_path, "rb") as config_file:
545                    config = config_file.read()
546        if driver_info.config_needed and not config:
547            logger.warning("Config needed, but no config file available")
548            return None
549
550        return cls(host, driver_info, firmware, config)
551
552    def __init__(self, host, driver_info, firmware, config):
553        self.host = weakref.proxy(host)
554        self.driver_info = driver_info
555        self.firmware = firmware
556        self.config = config
557
558    @staticmethod
559    async def drop_firmware(host):
560        host.send_hci_packet(HCI_RTK_Drop_Firmware_Command())
561
562        # Wait for the command to be effective (no response is sent)
563        await asyncio.sleep(Driver.POST_DROP_DELAY)
564
565    async def download_for_rtl8723a(self):
566        # Check that the firmware image does not include an epatch signature.
567        if RTK_EPATCH_SIGNATURE in self.firmware:
568            logger.warning(
569                "epatch signature found in firmware, it is probably the wrong firmware"
570            )
571            return
572
573        # TODO: load the firmware
574
575    async def download_for_rtl8723b(self):
576        if self.driver_info.has_rom_version:
577            response = await self.host.send_command(
578                HCI_RTK_Read_ROM_Version_Command(), check_result=True
579            )
580            if response.return_parameters.status != HCI_SUCCESS:
581                logger.warning("can't get ROM version")
582                return
583            rom_version = response.return_parameters.version
584            logger.debug(f"ROM version before download: {rom_version:04X}")
585        else:
586            rom_version = 0
587
588        firmware = Firmware(self.firmware)
589        logger.debug(f"firmware: project_id=0x{firmware.project_id:04X}")
590        for patch in firmware.patches:
591            if patch[0] == rom_version + 1:
592                logger.debug(f"using patch {patch[0]}")
593                break
594        else:
595            logger.warning("no valid patch found for rom version {rom_version}")
596            return
597
598        # Append the config if there is one.
599        if self.config:
600            payload = patch[1] + self.config
601        else:
602            payload = patch[1]
603
604        # Download the payload, one fragment at a time.
605        fragment_count = math.ceil(len(payload) / RTK_FRAGMENT_LENGTH)
606        for fragment_index in range(fragment_count):
607            # NOTE: the Linux driver somehow adds 1 to the index after it wraps around.
608            # That's odd, but we"ll do the same here.
609            download_index = fragment_index & 0x7F
610            if download_index >= 0x80:
611                download_index += 1
612            if fragment_index == fragment_count - 1:
613                download_index |= 0x80  # End marker.
614            fragment_offset = fragment_index * RTK_FRAGMENT_LENGTH
615            fragment = payload[fragment_offset : fragment_offset + RTK_FRAGMENT_LENGTH]
616            logger.debug(f"downloading fragment {fragment_index}")
617            await self.host.send_command(
618                HCI_RTK_Download_Command(
619                    index=download_index, payload=fragment, check_result=True
620                )
621            )
622
623        logger.debug("download complete!")
624
625        # Read the version again
626        response = await self.host.send_command(
627            HCI_RTK_Read_ROM_Version_Command(), check_result=True
628        )
629        if response.return_parameters.status != HCI_SUCCESS:
630            logger.warning("can't get ROM version")
631        else:
632            rom_version = response.return_parameters.version
633            logger.debug(f"ROM version after download: {rom_version:04X}")
634
635    async def download_firmware(self):
636        if self.driver_info.rom == RTK_ROM_LMP_8723A:
637            return await self.download_for_rtl8723a()
638
639        if self.driver_info.rom in (
640            RTK_ROM_LMP_8723B,
641            RTK_ROM_LMP_8821A,
642            RTK_ROM_LMP_8761A,
643            RTK_ROM_LMP_8822B,
644            RTK_ROM_LMP_8852A,
645        ):
646            return await self.download_for_rtl8723b()
647
648        raise ValueError("ROM not supported")
649
650    async def init_controller(self):
651        await self.download_firmware()
652        await self.host.send_command(HCI_Reset_Command(), check_result=True)
653        logger.info(f"loaded FW image {self.driver_info.fw_name}")
654
655
656def rtk_firmware_dir() -> pathlib.Path:
657    """
658    Returns:
659        A path to a subdir of the project data dir for Realtek firmware.
660         The directory is created if it doesn't exist.
661    """
662    from bumble.drivers import project_data_dir
663
664    p = project_data_dir() / "firmware" / "realtek"
665    p.mkdir(parents=True, exist_ok=True)
666    return p
667