• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2023 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Detects attached Raspberry Pi Pico boards."""
16
17from dataclasses import asdict, dataclass
18import logging
19import os
20from pathlib import Path
21import platform
22import shutil
23import subprocess
24import sys
25from typing import Iterable, Optional, Sequence
26
27from ctypes.util import find_library as ctypes_find_library
28import serial.tools.list_ports
29import usb  # type: ignore
30from usb.backend import libusb1  # type: ignore
31
32import pw_cli.log
33from pw_cli.env import pigweed_environment
34
35_LOG = logging.getLogger('pi_pico_detector')
36
37# Vendor/device ID to search for in USB devices.
38_RASPBERRY_PI_VENDOR_ID = 0x2E8A
39
40# RaspberryPi Debug probe: https://github.com/raspberrypi/debugprobe
41_DEBUG_PROBE_DEVICE_ID = 0x000C
42
43_PICO_USB_SERIAL_DEVICE_ID = 0x000A
44_PICO_BOOTLOADER_DEVICE_ID = 0x0003
45_PICO_DEVICE_IDS = (
46    _PICO_USB_SERIAL_DEVICE_ID,
47    _PICO_BOOTLOADER_DEVICE_ID,
48)
49
50_ALL_DEVICE_IDS = (
51    _DEBUG_PROBE_DEVICE_ID,
52    _PICO_USB_SERIAL_DEVICE_ID,
53    _PICO_BOOTLOADER_DEVICE_ID,
54)
55
56_LIBUSB_CIPD_INSTALL_ENV_VAR = 'PW_PIGWEED_CIPD_INSTALL_DIR'
57_LIBUSB_CIPD_SUBDIR = 'libexec'
58
59if platform.system() == 'Linux':
60    _LIB_SUFFIX = '.so'
61elif platform.system() == 'Darwin':
62    _LIB_SUFFIX = '.dylib'
63elif platform.system() == 'Windows':
64    _LIB_SUFFIX = '.dll'
65else:
66    _LOG.error('Unsupported platform.system(): %s', platform.system())
67    sys.exit(1)
68
69
70def _custom_find_library(name: str) -> str | None:
71    """Search for shared libraries in non-standard locations."""
72    search_paths: list[Path] = []
73
74    # Add to search_paths starting with lowest priority locations.
75
76    if platform.system() == 'Darwin':
77        # libusb from homebrew
78        homebrew_prefix = os.environ.get('HOMEBREW_PREFIX', '')
79        if homebrew_prefix:
80            homebrew_lib = Path(homebrew_prefix) / 'lib'
81            homebrew_lib = homebrew_lib.expanduser().resolve()
82            if homebrew_lib.is_dir():
83                search_paths.append(homebrew_lib)
84
85    # libusb from pkg-config
86    pkg_config_bin = shutil.which('pkg-config')
87    if pkg_config_bin:
88        # pkg-config often prefixes libraries with 'lib', check both.
89        for pkg_name in [f'lib{name}', name]:
90            pkg_config_command = [pkg_config_bin, '--variable=libdir', pkg_name]
91            process = subprocess.run(
92                pkg_config_command,
93                stdout=subprocess.PIPE,
94                stderr=subprocess.STDOUT,
95            )
96            if process.returncode == 0:
97                pkg_config_libdir = Path(
98                    process.stdout.decode('utf-8', errors='ignore').strip()
99                )
100                if pkg_config_libdir.is_dir():
101                    search_paths.append(pkg_config_libdir)
102                    break
103
104    # libusb provided by CIPD:
105    pw_env = pigweed_environment()
106    if _LIBUSB_CIPD_INSTALL_ENV_VAR in pw_env:
107        cipd_lib = (
108            Path(getattr(pw_env, _LIBUSB_CIPD_INSTALL_ENV_VAR))
109            / _LIBUSB_CIPD_SUBDIR
110        )
111        if cipd_lib.is_dir():
112            search_paths.append(cipd_lib)
113
114    _LOG.debug('Potential shared library search paths:')
115    for path in search_paths:
116        _LOG.debug(path)
117
118    # Search for shared libraries in search_paths
119    for libdir in reversed(search_paths):
120        lib_results = sorted(
121            str(lib.resolve())
122            for lib in libdir.iterdir()
123            if name in lib.name and _LIB_SUFFIX in lib.suffixes
124        )
125        if lib_results:
126            _LOG.debug('Using %s located at: %s', name, lib_results[-1])
127            # Return the highest lexigraphically sorted lib version
128            return lib_results[-1]
129
130    # Fallback to pyusb default of calling ctypes.util.find_library.
131    return ctypes_find_library(name)
132
133
134def _device_port_path(device: usb.core.Device) -> str:
135    """Returns the chain of ports that represent where a device is attached.
136
137    Example:
138        2:2:1
139    """
140    return ":".join([str(port) for port in device.port_numbers])
141
142
143def _libusb_raspberry_pi_devices(
144    bus_filter: int | None,
145    port_filter: str | None,
146    include_picos: bool = True,
147    include_debug_probes: bool = True,
148) -> Iterable[usb.core.Device]:
149    """Finds all Pi Pico-related USB devices."""
150    devices_to_match: list[int] = []
151    if include_picos:
152        devices_to_match.extend(_PICO_DEVICE_IDS)
153    if include_debug_probes:
154        devices_to_match.append(_DEBUG_PROBE_DEVICE_ID)
155
156    def _custom_match(d: usb.core.Device):
157        if d.idVendor != _RASPBERRY_PI_VENDOR_ID:
158            return False
159        if d.idProduct not in devices_to_match:
160            return False
161        if bus_filter is not None and d.bus != bus_filter:
162            return False
163        if port_filter is not None and _device_port_path(d) != port_filter:
164            return False
165
166        return True
167
168    return usb.core.find(
169        find_all=True,
170        custom_match=_custom_match,
171        backend=libusb1.get_backend(find_library=_custom_find_library),
172    )
173
174
175@dataclass
176class PicoBoardInfo:
177    """Information about a connected Pi Pico board.
178
179    NOTE: As a Pico board is flashed and reset, the USB address can change.
180    Also, the bootloader has a different serial number than the regular
181    application firmware. For this reason, this object does NOT cache or store
182    the serial number, address, or USB device ID.
183    """
184
185    bus: int
186    port: str
187    serial_port: Optional[str]
188
189    def address(self) -> int:
190        """Queries this device for its USB address.
191
192        WARNING: This is not necessarily stable, and may change whenever the
193        board is reset or flashed.
194        """
195        for device in _libusb_raspberry_pi_devices(
196            bus_filter=self.bus, port_filter=self.port
197        ):
198            if device.idProduct not in _ALL_DEVICE_IDS:
199                _LOG.error(
200                    'Unknown device type on bus %s port %s', self.bus, self.port
201                )
202            if _device_port_path(device) == self.port:
203                return device.address
204        raise ValueError(
205            'No Pico found, it may have been disconnected or flashed with '
206            'an incompatible application'
207        )
208
209    @staticmethod
210    def vendor_id() -> int:
211        return _RASPBERRY_PI_VENDOR_ID
212
213    def is_debug_probe(self) -> bool:
214        return isinstance(self, PicoDebugProbeBoardInfo)
215
216
217@dataclass
218class PicoDebugProbeBoardInfo(PicoBoardInfo):
219    """Information about a connected Pi Debug Probe.
220
221    Unlike a Pi Pico, a Pi Debug Probe has a stable serial number and device ID.
222    """
223
224    serial_number: str
225
226    @staticmethod
227    def device_id() -> int:
228        return _DEBUG_PROBE_DEVICE_ID
229
230
231@dataclass
232class _BoardSerialInfo:
233    """Object that ties a serial number to a serial com port."""
234
235    serial_port: str
236    serial_number: str
237
238
239@dataclass
240class _BoardUsbInfo:
241    """Object that ties a serial number to other USB device information.
242
243    WARNING: This is private and ephemeral because many of these values are not
244    necessarily stable as a Pico is flashed and reset. Use PicoBoardInfo or
245    PicoDebugProbeBoardInfo for more stable representations of an attached Pico.
246    """
247
248    serial_number: str
249    bus: int
250    port: str
251    product: str
252    manufacturer: str
253    vendor_id: int
254    product_id: int
255
256    @property
257    def in_bootloader_mode(self) -> bool:
258        return self.product_id == _PICO_BOOTLOADER_DEVICE_ID
259
260    @property
261    def in_usb_serial_mode(self) -> bool:
262        return self.product_id == _PICO_USB_SERIAL_DEVICE_ID
263
264    @property
265    def is_debug_probe(self) -> bool:
266        return self.product_id == _DEBUG_PROBE_DEVICE_ID
267
268    def __repr__(self) -> str:
269        return repr(asdict(self))
270
271
272def _detect_pico_usb_info(
273    bus_filter: int | None = None,
274    port_filter: str | None = None,
275    include_picos: bool = True,
276    include_debug_probes: bool = True,
277) -> dict[str, _BoardUsbInfo]:
278    """Finds Raspberry Pi Pico devices and retrieves USB info for each one."""
279    boards: dict[str, _BoardUsbInfo] = {}
280    devices = _libusb_raspberry_pi_devices(
281        bus_filter=bus_filter,
282        port_filter=port_filter,
283        include_picos=include_picos,
284        include_debug_probes=include_debug_probes,
285    )
286
287    if not devices:
288        return boards
289
290    _LOG.debug('==> Detecting Raspberry Pi devices')
291    for device in devices:
292        try:
293            serial_number = device.serial_number
294        except ValueError as e:
295            _LOG.warning(
296                '  --> A connected device has an inaccessible '
297                'serial number: %s',
298                e,
299            )
300            continue
301
302        board_usb_info = _BoardUsbInfo(
303            serial_number=serial_number,
304            bus=device.bus,
305            port=_device_port_path(device),
306            product=device.product,
307            manufacturer=device.manufacturer,
308            vendor_id=device.idVendor,
309            product_id=device.idProduct,
310        )
311
312        if board_usb_info.in_usb_serial_mode:
313            boards[serial_number] = board_usb_info
314            _LOG.debug(
315                '  --> Found a Pi Pico in USB serial mode: %s', board_usb_info
316            )
317
318        elif board_usb_info.in_bootloader_mode:
319            boards[serial_number] = board_usb_info
320            _LOG.debug(
321                '  --> Found a Pi Pico in bootloader mode: %s', board_usb_info
322            )
323
324        elif board_usb_info.is_debug_probe:
325            boards[serial_number] = board_usb_info
326            _LOG.debug(
327                '  --> Found Raspberry Pi debug probe: %s', board_usb_info
328            )
329
330        else:
331            _LOG.warning(
332                '  --> Found unknown/incompatible Raspberry Pi: %s',
333                board_usb_info,
334            )
335
336    return boards
337
338
339def _detect_pico_serial_ports() -> dict[str, _BoardSerialInfo]:
340    """Finds the serial com port associated with each Raspberry Pi Pico."""
341    boards = {}
342    all_devs = serial.tools.list_ports.comports()
343    for dev in all_devs:
344        if dev.vid == _RASPBERRY_PI_VENDOR_ID and (
345            dev.pid == _PICO_USB_SERIAL_DEVICE_ID
346            or dev.pid == _DEBUG_PROBE_DEVICE_ID
347        ):
348            serial_number = dev.serial_number
349            if serial_number is None:
350                _LOG.error('Found pico with no serial number')
351                continue
352            boards[serial_number] = _BoardSerialInfo(
353                serial_port=dev.device,
354                serial_number=serial_number,
355            )
356    return boards
357
358
359def board_from_usb_port(
360    bus: int,
361    port: str,
362    include_picos: bool = True,
363    include_debug_probes: bool = True,
364) -> PicoDebugProbeBoardInfo | PicoBoardInfo:
365    """Retrieves board info for the Pico at the specified USB bus and port.
366
367    Args:
368        bus: The USB bus that the requested Pico resides on.
369        port: The chain of ports as a colon separated list of integers (e.g.
370            '1:4:2:2') that the requested Pico resides on. This only performs
371            exact matches.
372
373    Returns:
374      The board at the requested bus/port.
375    """
376    serial_devices = _detect_pico_serial_ports()
377    pico_usb_info = _detect_pico_usb_info(
378        include_picos=include_picos,
379        include_debug_probes=include_debug_probes,
380        bus_filter=bus,
381        port_filter=port,
382    )
383
384    if not pico_usb_info:
385        raise ValueError(f'No matching device found on bus {bus} port {port}')
386
387    if len(pico_usb_info) > 1:
388        raise ValueError(f'Multiple Picos found on bus {bus} port {port}')
389
390    usb_info = next(iter(pico_usb_info.values()))
391
392    serial_port = None
393    if usb_info.serial_number in serial_devices:
394        serial_port = serial_devices[usb_info.serial_number].serial_port
395
396    if usb_info.is_debug_probe:
397        return PicoDebugProbeBoardInfo(
398            bus=usb_info.bus,
399            port=usb_info.port,
400            serial_port=serial_port,
401            serial_number=usb_info.serial_number,
402        )
403
404    return PicoBoardInfo(
405        bus=usb_info.bus,
406        port=usb_info.port,
407        serial_port=serial_port,
408    )
409
410
411def detect_boards(
412    include_picos: bool = True, include_debug_probes: bool = False
413) -> Sequence[PicoBoardInfo | PicoDebugProbeBoardInfo]:
414    """Detects attached Raspberry Pi Pico boards in USB serial mode.
415
416    Args:
417        include_picos: Whether or not to include detected Raspberry Pi Picos in
418            the list of enumerated devices.
419        include_debug_probes: Whether or not to include detected Raspberry Pi
420            debug probes in the list of enumerated devices.
421
422    Returns:
423      A list of all found boards.
424    """
425    serial_devices = _detect_pico_serial_ports()
426    pico_usb_info = _detect_pico_usb_info(
427        include_picos=include_picos,
428        include_debug_probes=include_debug_probes,
429    )
430    boards: list[PicoBoardInfo | PicoDebugProbeBoardInfo] = []
431    for serial_number, usb_info in pico_usb_info.items():
432        if not include_debug_probes and usb_info.is_debug_probe:
433            continue
434
435        if not include_picos and not usb_info.is_debug_probe:
436            continue
437
438        serial_port = None
439        if serial_number in serial_devices:
440            serial_port = serial_devices[serial_number].serial_port
441
442        if usb_info.is_debug_probe:
443            boards.append(
444                PicoDebugProbeBoardInfo(
445                    bus=usb_info.bus,
446                    port=usb_info.port,
447                    serial_port=serial_port,
448                    serial_number=usb_info.serial_number,
449                )
450            )
451        elif serial_port or usb_info.in_bootloader_mode:
452            boards.append(
453                PicoBoardInfo(
454                    bus=usb_info.bus,
455                    port=usb_info.port,
456                    serial_port=serial_port,
457                )
458            )
459
460    return boards
461
462
463def main():
464    """Detects and then prints all attached Raspberry Pi Picos."""
465    pw_cli.log.install(
466        level=logging.DEBUG, use_color=True, hide_timestamp=False
467    )
468
469    boards = detect_boards(include_picos=True, include_debug_probes=True)
470    if not boards:
471        _LOG.info('No attached boards detected')
472    for idx, board in enumerate(boards):
473        _LOG.info('Board %d:', idx)
474        _LOG.info('  %s', board)
475
476
477if __name__ == '__main__':
478    main()
479