• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2023 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Detects attached Raspberry Pi Pico boards."""
16
17from dataclasses import asdict, dataclass
18import logging
19import os
20from pathlib import Path
21import platform
22import shutil
23import subprocess
24import sys
25from typing import Iterable, Optional, Sequence
26
27from ctypes.util import find_library as ctypes_find_library
28import serial.tools.list_ports
29import usb  # type: ignore
30from usb.backend import libusb1  # type: ignore
31
32import pw_cli.log
33from pw_cli.env import pigweed_environment
34
35_LOG = logging.getLogger('pi_pico_detector')
36
37# Vendor/device ID to search for in USB devices.
38_RASPBERRY_PI_VENDOR_ID = 0x2E8A
39
40# RaspberryPi Debug probe: https://github.com/raspberrypi/debugprobe
41_DEBUG_PROBE_DEVICE_ID = 0x000C
42
43_RP2040_USB_SERIAL_DEVICE_ID = 0x000A
44_RP2040_BOOTLOADER_DEVICE_ID = 0x0003
45_PICO_USB_SERIAL_DEVICE_ID = 0x0009
46_PICO_BOOTLOADER_DEVICE_ID = 0x000F
47
48_PICO_USB_SERIAL_DEVICE_IDS = (
49    _RP2040_USB_SERIAL_DEVICE_ID,
50    _PICO_USB_SERIAL_DEVICE_ID,
51)
52
53_PICO_BOOTLOADER_DEVICE_IDS = (
54    _RP2040_BOOTLOADER_DEVICE_ID,
55    _PICO_BOOTLOADER_DEVICE_ID,
56)
57
58_PICO_DEVICE_IDS = (
59    *_PICO_USB_SERIAL_DEVICE_IDS,
60    *_PICO_BOOTLOADER_DEVICE_IDS,
61)
62
63_ALL_DEVICE_IDS = (
64    _DEBUG_PROBE_DEVICE_ID,
65    *_PICO_USB_SERIAL_DEVICE_IDS,
66    *_PICO_BOOTLOADER_DEVICE_IDS,
67)
68
69_LIBUSB_CIPD_INSTALL_ENV_VAR = 'PW_PIGWEED_CIPD_INSTALL_DIR'
70_LIBUSB_CIPD_SUBDIR = 'libexec'
71
72if platform.system() == 'Linux':
73    _LIB_SUFFIX = '.so'
74elif platform.system() == 'Darwin':
75    _LIB_SUFFIX = '.dylib'
76elif platform.system() == 'Windows':
77    _LIB_SUFFIX = '.dll'
78else:
79    _LOG.error('Unsupported platform.system(): %s', platform.system())
80    sys.exit(1)
81
82
83def _custom_find_library(name: str) -> str | None:
84    """Search for shared libraries in non-standard locations."""
85    search_paths: list[Path] = []
86
87    # Add to search_paths starting with lowest priority locations.
88
89    if platform.system() == 'Darwin':
90        # libusb from homebrew
91        homebrew_prefix = os.environ.get('HOMEBREW_PREFIX', '')
92        if homebrew_prefix:
93            homebrew_lib = Path(homebrew_prefix) / 'lib'
94            homebrew_lib = homebrew_lib.expanduser().resolve()
95            if homebrew_lib.is_dir():
96                search_paths.append(homebrew_lib)
97
98    # libusb from pkg-config
99    pkg_config_bin = shutil.which('pkg-config')
100    if pkg_config_bin:
101        # pkg-config often prefixes libraries with 'lib', check both.
102        for pkg_name in [f'lib{name}', name]:
103            pkg_config_command = [pkg_config_bin, '--variable=libdir', pkg_name]
104            process = subprocess.run(
105                pkg_config_command,
106                stdout=subprocess.PIPE,
107                stderr=subprocess.STDOUT,
108            )
109            if process.returncode == 0:
110                pkg_config_libdir = Path(
111                    process.stdout.decode('utf-8', errors='ignore').strip()
112                )
113                if pkg_config_libdir.is_dir():
114                    search_paths.append(pkg_config_libdir)
115                    break
116
117    # libusb provided by CIPD:
118    pw_env = pigweed_environment()
119    if _LIBUSB_CIPD_INSTALL_ENV_VAR in pw_env:
120        cipd_lib = (
121            Path(getattr(pw_env, _LIBUSB_CIPD_INSTALL_ENV_VAR))
122            / _LIBUSB_CIPD_SUBDIR
123        )
124        if cipd_lib.is_dir():
125            search_paths.append(cipd_lib)
126
127    # libusb provided by Bazel
128    try:
129        # pylint: disable=import-outside-toplevel
130        from python.runfiles import runfiles  # type: ignore
131
132        r = runfiles.Create()
133        libusb_dir = os.path.dirname(
134            r.Rlocation(
135                f'libusb/libusb-1.0{_LIB_SUFFIX}',
136                r.CurrentRepository(),
137            )
138        )
139        search_paths.append(Path(libusb_dir))
140    except ImportError:
141        pass
142
143    _LOG.debug('Potential shared library search paths:')
144    for path in search_paths:
145        _LOG.debug(path)
146
147    # Search for shared libraries in search_paths
148    for libdir in reversed(search_paths):
149        lib_results = sorted(
150            str(lib.resolve())
151            for lib in libdir.iterdir()
152            if name in lib.name and _LIB_SUFFIX in lib.suffixes
153        )
154        if lib_results:
155            _LOG.debug('Using %s located at: %s', name, lib_results[-1])
156            # Return the highest lexigraphically sorted lib version
157            return lib_results[-1]
158
159    # Fallback to pyusb default of calling ctypes.util.find_library.
160    return ctypes_find_library(name)
161
162
163def _device_port_path(device: usb.core.Device) -> str:
164    """Returns the chain of ports that represent where a device is attached.
165
166    Example:
167        2:2:1
168    """
169    return ":".join([str(port) for port in device.port_numbers])
170
171
172def _libusb_raspberry_pi_devices(
173    bus_filter: int | None,
174    port_filter: str | None,
175    include_picos: bool = True,
176    include_debug_probes: bool = True,
177) -> Iterable[usb.core.Device]:
178    """Finds all Pi Pico-related USB devices."""
179    devices_to_match: list[int] = []
180    if include_picos:
181        devices_to_match.extend(_PICO_DEVICE_IDS)
182    if include_debug_probes:
183        devices_to_match.append(_DEBUG_PROBE_DEVICE_ID)
184
185    def _custom_match(d: usb.core.Device):
186        if d.idVendor != _RASPBERRY_PI_VENDOR_ID:
187            return False
188        if d.idProduct not in devices_to_match:
189            return False
190        if bus_filter is not None and d.bus != bus_filter:
191            return False
192        if port_filter is not None and _device_port_path(d) != port_filter:
193            return False
194
195        return True
196
197    return usb.core.find(
198        find_all=True,
199        custom_match=_custom_match,
200        backend=libusb1.get_backend(find_library=_custom_find_library),
201    )
202
203
204@dataclass
205class PicoBoardInfo:
206    """Information about a connected Pi Pico board.
207
208    NOTE: As a Pico board is flashed and reset, the USB address can change.
209    Also, the bootloader has a different serial number than the regular
210    application firmware. For this reason, this object does NOT cache or store
211    the serial number, address, or USB device ID.
212    """
213
214    bus: int
215    port: str
216    serial_port: Optional[str]
217    manufacturer: Optional[str]
218    product: Optional[str]
219
220    def address(self) -> int:
221        """Queries this device for its USB address.
222
223        WARNING: This is not necessarily stable, and may change whenever the
224        board is reset or flashed.
225        """
226        for device in _libusb_raspberry_pi_devices(
227            bus_filter=self.bus, port_filter=self.port
228        ):
229            if device.idProduct not in _ALL_DEVICE_IDS:
230                _LOG.error(
231                    'Unknown device type on bus %s port %s', self.bus, self.port
232                )
233            if _device_port_path(device) == self.port:
234                return device.address
235        raise ValueError(
236            'No Pico found, it may have been disconnected or flashed with '
237            'an incompatible application'
238        )
239
240    @staticmethod
241    def vendor_id() -> int:
242        return _RASPBERRY_PI_VENDOR_ID
243
244    def is_debug_probe(self) -> bool:
245        return isinstance(self, PicoDebugProbeBoardInfo)
246
247
248@dataclass
249class PicoDebugProbeBoardInfo(PicoBoardInfo):
250    """Information about a connected Pi Debug Probe.
251
252    Unlike a Pi Pico, a Pi Debug Probe has a stable serial number and device ID.
253    """
254
255    serial_number: str
256
257    @staticmethod
258    def device_id() -> int:
259        return _DEBUG_PROBE_DEVICE_ID
260
261
262@dataclass
263class _BoardSerialInfo:
264    """Object that ties a serial number to a serial com port."""
265
266    serial_port: str
267    serial_number: str
268
269
270@dataclass
271class _BoardUsbInfo:
272    """Object that ties a serial number to other USB device information.
273
274    WARNING: This is private and ephemeral because many of these values are not
275    necessarily stable as a Pico is flashed and reset. Use PicoBoardInfo or
276    PicoDebugProbeBoardInfo for more stable representations of an attached Pico.
277    """
278
279    serial_number: str
280    bus: int
281    port: str
282    product: str
283    manufacturer: str
284    vendor_id: int
285    product_id: int
286
287    @property
288    def in_bootloader_mode(self) -> bool:
289        return self.product_id in _PICO_BOOTLOADER_DEVICE_IDS
290
291    @property
292    def in_usb_serial_mode(self) -> bool:
293        return self.product_id in _PICO_USB_SERIAL_DEVICE_IDS
294
295    @property
296    def is_debug_probe(self) -> bool:
297        return self.product_id == _DEBUG_PROBE_DEVICE_ID
298
299    def __repr__(self) -> str:
300        return repr(asdict(self))
301
302
303def _detect_pico_usb_info(
304    bus_filter: int | None = None,
305    port_filter: str | None = None,
306    include_picos: bool = True,
307    include_debug_probes: bool = True,
308) -> dict[str, _BoardUsbInfo]:
309    """Finds Raspberry Pi Pico devices and retrieves USB info for each one."""
310    boards: dict[str, _BoardUsbInfo] = {}
311    devices = _libusb_raspberry_pi_devices(
312        bus_filter=bus_filter,
313        port_filter=port_filter,
314        include_picos=include_picos,
315        include_debug_probes=include_debug_probes,
316    )
317
318    if not devices:
319        return boards
320
321    _LOG.debug('==> Detecting Raspberry Pi devices')
322    for device in devices:
323        try:
324            serial_number = device.serial_number
325        except ValueError as e:
326            _LOG.warning(
327                '  --> A connected device has an inaccessible '
328                'serial number: %s',
329                e,
330            )
331            continue
332
333        board_usb_info = _BoardUsbInfo(
334            serial_number=serial_number,
335            bus=device.bus,
336            port=_device_port_path(device),
337            product=device.product,
338            manufacturer=device.manufacturer,
339            vendor_id=device.idVendor,
340            product_id=device.idProduct,
341        )
342
343        if board_usb_info.in_usb_serial_mode:
344            boards[serial_number] = board_usb_info
345            _LOG.debug(
346                '  --> Found a Pi Pico in USB serial mode: %s', board_usb_info
347            )
348
349        elif board_usb_info.in_bootloader_mode:
350            boards[serial_number] = board_usb_info
351            _LOG.debug(
352                '  --> Found a Pi Pico in bootloader mode: %s', board_usb_info
353            )
354
355        elif board_usb_info.is_debug_probe:
356            boards[serial_number] = board_usb_info
357            _LOG.debug(
358                '  --> Found Raspberry Pi debug probe: %s', board_usb_info
359            )
360            if device.bcdDevice < 0x201:
361                _LOG.error(
362                    'Reliable flashing and testing not possible due to '
363                    'outdated Debug Probe firmware (%d.%d.%d). Update to '
364                    'version 2.0.1 or later. See https://www.raspberrypi.com/'
365                    'documentation/microcontrollers/debug-probe.html for '
366                    'update instructions.',
367                    (device.bcdDevice >> 8 & 0xF),
368                    (device.bcdDevice >> 4 & 0xF),
369                    (device.bcdDevice & 0xF),
370                )
371
372        else:
373            _LOG.warning(
374                '  --> Found unknown/incompatible Raspberry Pi: %s',
375                board_usb_info,
376            )
377
378    return boards
379
380
381def _detect_pico_serial_ports() -> dict[str, _BoardSerialInfo]:
382    """Finds the serial com port associated with each Raspberry Pi Pico."""
383    boards = {}
384    all_devs = serial.tools.list_ports.comports()
385    for dev in all_devs:
386        if dev.vid == _RASPBERRY_PI_VENDOR_ID and (
387            dev.pid in _PICO_USB_SERIAL_DEVICE_IDS
388            or dev.pid == _DEBUG_PROBE_DEVICE_ID
389        ):
390            serial_number = dev.serial_number
391            if serial_number is None:
392                _LOG.error('Found pico with no serial number')
393                continue
394            boards[serial_number] = _BoardSerialInfo(
395                serial_port=dev.device,
396                serial_number=serial_number,
397            )
398    return boards
399
400
401def board_from_usb_port(
402    bus: int,
403    port: str,
404    include_picos: bool = True,
405    include_debug_probes: bool = True,
406) -> PicoDebugProbeBoardInfo | PicoBoardInfo:
407    """Retrieves board info for the Pico at the specified USB bus and port.
408
409    Args:
410        bus: The USB bus that the requested Pico resides on.
411        port: The chain of ports as a colon separated list of integers (e.g.
412            '1:4:2:2') that the requested Pico resides on. This only performs
413            exact matches.
414
415    Returns:
416      The board at the requested bus/port.
417    """
418    serial_devices = _detect_pico_serial_ports()
419    pico_usb_info = _detect_pico_usb_info(
420        include_picos=include_picos,
421        include_debug_probes=include_debug_probes,
422        bus_filter=bus,
423        port_filter=port,
424    )
425
426    if not pico_usb_info:
427        raise ValueError(f'No matching device found on bus {bus} port {port}')
428
429    if len(pico_usb_info) > 1:
430        raise ValueError(f'Multiple Picos found on bus {bus} port {port}')
431
432    usb_info = next(iter(pico_usb_info.values()))
433
434    serial_port = None
435    if usb_info.serial_number in serial_devices:
436        serial_port = serial_devices[usb_info.serial_number].serial_port
437
438    if usb_info.is_debug_probe:
439        return PicoDebugProbeBoardInfo(
440            bus=usb_info.bus,
441            port=usb_info.port,
442            serial_port=serial_port,
443            serial_number=usb_info.serial_number,
444            product=usb_info.product,
445            manufacturer=usb_info.manufacturer,
446        )
447
448    return PicoBoardInfo(
449        bus=usb_info.bus,
450        port=usb_info.port,
451        serial_port=serial_port,
452        product=usb_info.product,
453        manufacturer=usb_info.manufacturer,
454    )
455
456
457def detect_boards(
458    include_picos: bool = True, include_debug_probes: bool = False
459) -> Sequence[PicoBoardInfo | PicoDebugProbeBoardInfo]:
460    """Detects attached Raspberry Pi Pico boards in USB serial mode.
461
462    Args:
463        include_picos: Whether or not to include detected Raspberry Pi Picos in
464            the list of enumerated devices.
465        include_debug_probes: Whether or not to include detected Raspberry Pi
466            debug probes in the list of enumerated devices.
467
468    Returns:
469      A list of all found boards.
470    """
471    serial_devices = _detect_pico_serial_ports()
472    pico_usb_info = _detect_pico_usb_info(
473        include_picos=include_picos,
474        include_debug_probes=include_debug_probes,
475    )
476    boards: list[PicoBoardInfo | PicoDebugProbeBoardInfo] = []
477    for serial_number, usb_info in pico_usb_info.items():
478        if not include_debug_probes and usb_info.is_debug_probe:
479            continue
480
481        if not include_picos and not usb_info.is_debug_probe:
482            continue
483
484        serial_port = None
485        if serial_number in serial_devices:
486            serial_port = serial_devices[serial_number].serial_port
487
488        if usb_info.is_debug_probe:
489            boards.append(
490                PicoDebugProbeBoardInfo(
491                    bus=usb_info.bus,
492                    port=usb_info.port,
493                    serial_port=serial_port,
494                    serial_number=usb_info.serial_number,
495                    product=usb_info.product,
496                    manufacturer=usb_info.manufacturer,
497                )
498            )
499        elif serial_port or usb_info.in_bootloader_mode:
500            boards.append(
501                PicoBoardInfo(
502                    bus=usb_info.bus,
503                    port=usb_info.port,
504                    serial_port=serial_port,
505                    product=usb_info.product,
506                    manufacturer=usb_info.manufacturer,
507                )
508            )
509
510    return boards
511
512
513def main():
514    """Detects and then prints all attached Raspberry Pi Picos."""
515    pw_cli.log.install(
516        level=logging.DEBUG, use_color=True, hide_timestamp=False
517    )
518
519    boards = detect_boards(include_picos=True, include_debug_probes=True)
520    if not boards:
521        _LOG.info('No attached boards detected')
522    for idx, board in enumerate(boards):
523        _LOG.info('Board %d:', idx)
524        _LOG.info('  %s', board)
525
526
527if __name__ == '__main__':
528    main()
529