1#!/usr/bin/env python3 2# Copyright 2023 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Detects attached Raspberry Pi Pico boards.""" 16 17from dataclasses import asdict, dataclass 18import logging 19import os 20from pathlib import Path 21import platform 22import shutil 23import subprocess 24import sys 25from typing import Iterable, Optional, Sequence 26 27from ctypes.util import find_library as ctypes_find_library 28import serial.tools.list_ports 29import usb # type: ignore 30from usb.backend import libusb1 # type: ignore 31 32import pw_cli.log 33from pw_cli.env import pigweed_environment 34 35_LOG = logging.getLogger('pi_pico_detector') 36 37# Vendor/device ID to search for in USB devices. 38_RASPBERRY_PI_VENDOR_ID = 0x2E8A 39 40# RaspberryPi Debug probe: https://github.com/raspberrypi/debugprobe 41_DEBUG_PROBE_DEVICE_ID = 0x000C 42 43_RP2040_USB_SERIAL_DEVICE_ID = 0x000A 44_RP2040_BOOTLOADER_DEVICE_ID = 0x0003 45_PICO_USB_SERIAL_DEVICE_ID = 0x0009 46_PICO_BOOTLOADER_DEVICE_ID = 0x000F 47 48_PICO_USB_SERIAL_DEVICE_IDS = ( 49 _RP2040_USB_SERIAL_DEVICE_ID, 50 _PICO_USB_SERIAL_DEVICE_ID, 51) 52 53_PICO_BOOTLOADER_DEVICE_IDS = ( 54 _RP2040_BOOTLOADER_DEVICE_ID, 55 _PICO_BOOTLOADER_DEVICE_ID, 56) 57 58_PICO_DEVICE_IDS = ( 59 *_PICO_USB_SERIAL_DEVICE_IDS, 60 *_PICO_BOOTLOADER_DEVICE_IDS, 61) 62 63_ALL_DEVICE_IDS = ( 64 _DEBUG_PROBE_DEVICE_ID, 65 *_PICO_USB_SERIAL_DEVICE_IDS, 66 *_PICO_BOOTLOADER_DEVICE_IDS, 67) 68 69_LIBUSB_CIPD_INSTALL_ENV_VAR = 'PW_PIGWEED_CIPD_INSTALL_DIR' 70_LIBUSB_CIPD_SUBDIR = 'libexec' 71 72if platform.system() == 'Linux': 73 _LIB_SUFFIX = '.so' 74elif platform.system() == 'Darwin': 75 _LIB_SUFFIX = '.dylib' 76elif platform.system() == 'Windows': 77 _LIB_SUFFIX = '.dll' 78else: 79 _LOG.error('Unsupported platform.system(): %s', platform.system()) 80 sys.exit(1) 81 82 83def _custom_find_library(name: str) -> str | None: 84 """Search for shared libraries in non-standard locations.""" 85 search_paths: list[Path] = [] 86 87 # Add to search_paths starting with lowest priority locations. 88 89 if platform.system() == 'Darwin': 90 # libusb from homebrew 91 homebrew_prefix = os.environ.get('HOMEBREW_PREFIX', '') 92 if homebrew_prefix: 93 homebrew_lib = Path(homebrew_prefix) / 'lib' 94 homebrew_lib = homebrew_lib.expanduser().resolve() 95 if homebrew_lib.is_dir(): 96 search_paths.append(homebrew_lib) 97 98 # libusb from pkg-config 99 pkg_config_bin = shutil.which('pkg-config') 100 if pkg_config_bin: 101 # pkg-config often prefixes libraries with 'lib', check both. 102 for pkg_name in [f'lib{name}', name]: 103 pkg_config_command = [pkg_config_bin, '--variable=libdir', pkg_name] 104 process = subprocess.run( 105 pkg_config_command, 106 stdout=subprocess.PIPE, 107 stderr=subprocess.STDOUT, 108 ) 109 if process.returncode == 0: 110 pkg_config_libdir = Path( 111 process.stdout.decode('utf-8', errors='ignore').strip() 112 ) 113 if pkg_config_libdir.is_dir(): 114 search_paths.append(pkg_config_libdir) 115 break 116 117 # libusb provided by CIPD: 118 pw_env = pigweed_environment() 119 if _LIBUSB_CIPD_INSTALL_ENV_VAR in pw_env: 120 cipd_lib = ( 121 Path(getattr(pw_env, _LIBUSB_CIPD_INSTALL_ENV_VAR)) 122 / _LIBUSB_CIPD_SUBDIR 123 ) 124 if cipd_lib.is_dir(): 125 search_paths.append(cipd_lib) 126 127 # libusb provided by Bazel 128 try: 129 # pylint: disable=import-outside-toplevel 130 from python.runfiles import runfiles # type: ignore 131 132 r = runfiles.Create() 133 libusb_dir = os.path.dirname( 134 r.Rlocation( 135 f'libusb/libusb-1.0{_LIB_SUFFIX}', 136 r.CurrentRepository(), 137 ) 138 ) 139 search_paths.append(Path(libusb_dir)) 140 except ImportError: 141 pass 142 143 _LOG.debug('Potential shared library search paths:') 144 for path in search_paths: 145 _LOG.debug(path) 146 147 # Search for shared libraries in search_paths 148 for libdir in reversed(search_paths): 149 lib_results = sorted( 150 str(lib.resolve()) 151 for lib in libdir.iterdir() 152 if name in lib.name and _LIB_SUFFIX in lib.suffixes 153 ) 154 if lib_results: 155 _LOG.debug('Using %s located at: %s', name, lib_results[-1]) 156 # Return the highest lexigraphically sorted lib version 157 return lib_results[-1] 158 159 # Fallback to pyusb default of calling ctypes.util.find_library. 160 return ctypes_find_library(name) 161 162 163def _device_port_path(device: usb.core.Device) -> str: 164 """Returns the chain of ports that represent where a device is attached. 165 166 Example: 167 2:2:1 168 """ 169 return ":".join([str(port) for port in device.port_numbers]) 170 171 172def _libusb_raspberry_pi_devices( 173 bus_filter: int | None, 174 port_filter: str | None, 175 include_picos: bool = True, 176 include_debug_probes: bool = True, 177) -> Iterable[usb.core.Device]: 178 """Finds all Pi Pico-related USB devices.""" 179 devices_to_match: list[int] = [] 180 if include_picos: 181 devices_to_match.extend(_PICO_DEVICE_IDS) 182 if include_debug_probes: 183 devices_to_match.append(_DEBUG_PROBE_DEVICE_ID) 184 185 def _custom_match(d: usb.core.Device): 186 if d.idVendor != _RASPBERRY_PI_VENDOR_ID: 187 return False 188 if d.idProduct not in devices_to_match: 189 return False 190 if bus_filter is not None and d.bus != bus_filter: 191 return False 192 if port_filter is not None and _device_port_path(d) != port_filter: 193 return False 194 195 return True 196 197 return usb.core.find( 198 find_all=True, 199 custom_match=_custom_match, 200 backend=libusb1.get_backend(find_library=_custom_find_library), 201 ) 202 203 204@dataclass 205class PicoBoardInfo: 206 """Information about a connected Pi Pico board. 207 208 NOTE: As a Pico board is flashed and reset, the USB address can change. 209 Also, the bootloader has a different serial number than the regular 210 application firmware. For this reason, this object does NOT cache or store 211 the serial number, address, or USB device ID. 212 """ 213 214 bus: int 215 port: str 216 serial_port: Optional[str] 217 manufacturer: Optional[str] 218 product: Optional[str] 219 220 def address(self) -> int: 221 """Queries this device for its USB address. 222 223 WARNING: This is not necessarily stable, and may change whenever the 224 board is reset or flashed. 225 """ 226 for device in _libusb_raspberry_pi_devices( 227 bus_filter=self.bus, port_filter=self.port 228 ): 229 if device.idProduct not in _ALL_DEVICE_IDS: 230 _LOG.error( 231 'Unknown device type on bus %s port %s', self.bus, self.port 232 ) 233 if _device_port_path(device) == self.port: 234 return device.address 235 raise ValueError( 236 'No Pico found, it may have been disconnected or flashed with ' 237 'an incompatible application' 238 ) 239 240 @staticmethod 241 def vendor_id() -> int: 242 return _RASPBERRY_PI_VENDOR_ID 243 244 def is_debug_probe(self) -> bool: 245 return isinstance(self, PicoDebugProbeBoardInfo) 246 247 248@dataclass 249class PicoDebugProbeBoardInfo(PicoBoardInfo): 250 """Information about a connected Pi Debug Probe. 251 252 Unlike a Pi Pico, a Pi Debug Probe has a stable serial number and device ID. 253 """ 254 255 serial_number: str 256 257 @staticmethod 258 def device_id() -> int: 259 return _DEBUG_PROBE_DEVICE_ID 260 261 262@dataclass 263class _BoardSerialInfo: 264 """Object that ties a serial number to a serial com port.""" 265 266 serial_port: str 267 serial_number: str 268 269 270@dataclass 271class _BoardUsbInfo: 272 """Object that ties a serial number to other USB device information. 273 274 WARNING: This is private and ephemeral because many of these values are not 275 necessarily stable as a Pico is flashed and reset. Use PicoBoardInfo or 276 PicoDebugProbeBoardInfo for more stable representations of an attached Pico. 277 """ 278 279 serial_number: str 280 bus: int 281 port: str 282 product: str 283 manufacturer: str 284 vendor_id: int 285 product_id: int 286 287 @property 288 def in_bootloader_mode(self) -> bool: 289 return self.product_id in _PICO_BOOTLOADER_DEVICE_IDS 290 291 @property 292 def in_usb_serial_mode(self) -> bool: 293 return self.product_id in _PICO_USB_SERIAL_DEVICE_IDS 294 295 @property 296 def is_debug_probe(self) -> bool: 297 return self.product_id == _DEBUG_PROBE_DEVICE_ID 298 299 def __repr__(self) -> str: 300 return repr(asdict(self)) 301 302 303def _detect_pico_usb_info( 304 bus_filter: int | None = None, 305 port_filter: str | None = None, 306 include_picos: bool = True, 307 include_debug_probes: bool = True, 308) -> dict[str, _BoardUsbInfo]: 309 """Finds Raspberry Pi Pico devices and retrieves USB info for each one.""" 310 boards: dict[str, _BoardUsbInfo] = {} 311 devices = _libusb_raspberry_pi_devices( 312 bus_filter=bus_filter, 313 port_filter=port_filter, 314 include_picos=include_picos, 315 include_debug_probes=include_debug_probes, 316 ) 317 318 if not devices: 319 return boards 320 321 _LOG.debug('==> Detecting Raspberry Pi devices') 322 for device in devices: 323 try: 324 serial_number = device.serial_number 325 except ValueError as e: 326 _LOG.warning( 327 ' --> A connected device has an inaccessible ' 328 'serial number: %s', 329 e, 330 ) 331 continue 332 333 board_usb_info = _BoardUsbInfo( 334 serial_number=serial_number, 335 bus=device.bus, 336 port=_device_port_path(device), 337 product=device.product, 338 manufacturer=device.manufacturer, 339 vendor_id=device.idVendor, 340 product_id=device.idProduct, 341 ) 342 343 if board_usb_info.in_usb_serial_mode: 344 boards[serial_number] = board_usb_info 345 _LOG.debug( 346 ' --> Found a Pi Pico in USB serial mode: %s', board_usb_info 347 ) 348 349 elif board_usb_info.in_bootloader_mode: 350 boards[serial_number] = board_usb_info 351 _LOG.debug( 352 ' --> Found a Pi Pico in bootloader mode: %s', board_usb_info 353 ) 354 355 elif board_usb_info.is_debug_probe: 356 boards[serial_number] = board_usb_info 357 _LOG.debug( 358 ' --> Found Raspberry Pi debug probe: %s', board_usb_info 359 ) 360 if device.bcdDevice < 0x201: 361 _LOG.error( 362 'Reliable flashing and testing not possible due to ' 363 'outdated Debug Probe firmware (%d.%d.%d). Update to ' 364 'version 2.0.1 or later. See https://www.raspberrypi.com/' 365 'documentation/microcontrollers/debug-probe.html for ' 366 'update instructions.', 367 (device.bcdDevice >> 8 & 0xF), 368 (device.bcdDevice >> 4 & 0xF), 369 (device.bcdDevice & 0xF), 370 ) 371 372 else: 373 _LOG.warning( 374 ' --> Found unknown/incompatible Raspberry Pi: %s', 375 board_usb_info, 376 ) 377 378 return boards 379 380 381def _detect_pico_serial_ports() -> dict[str, _BoardSerialInfo]: 382 """Finds the serial com port associated with each Raspberry Pi Pico.""" 383 boards = {} 384 all_devs = serial.tools.list_ports.comports() 385 for dev in all_devs: 386 if dev.vid == _RASPBERRY_PI_VENDOR_ID and ( 387 dev.pid in _PICO_USB_SERIAL_DEVICE_IDS 388 or dev.pid == _DEBUG_PROBE_DEVICE_ID 389 ): 390 serial_number = dev.serial_number 391 if serial_number is None: 392 _LOG.error('Found pico with no serial number') 393 continue 394 boards[serial_number] = _BoardSerialInfo( 395 serial_port=dev.device, 396 serial_number=serial_number, 397 ) 398 return boards 399 400 401def board_from_usb_port( 402 bus: int, 403 port: str, 404 include_picos: bool = True, 405 include_debug_probes: bool = True, 406) -> PicoDebugProbeBoardInfo | PicoBoardInfo: 407 """Retrieves board info for the Pico at the specified USB bus and port. 408 409 Args: 410 bus: The USB bus that the requested Pico resides on. 411 port: The chain of ports as a colon separated list of integers (e.g. 412 '1:4:2:2') that the requested Pico resides on. This only performs 413 exact matches. 414 415 Returns: 416 The board at the requested bus/port. 417 """ 418 serial_devices = _detect_pico_serial_ports() 419 pico_usb_info = _detect_pico_usb_info( 420 include_picos=include_picos, 421 include_debug_probes=include_debug_probes, 422 bus_filter=bus, 423 port_filter=port, 424 ) 425 426 if not pico_usb_info: 427 raise ValueError(f'No matching device found on bus {bus} port {port}') 428 429 if len(pico_usb_info) > 1: 430 raise ValueError(f'Multiple Picos found on bus {bus} port {port}') 431 432 usb_info = next(iter(pico_usb_info.values())) 433 434 serial_port = None 435 if usb_info.serial_number in serial_devices: 436 serial_port = serial_devices[usb_info.serial_number].serial_port 437 438 if usb_info.is_debug_probe: 439 return PicoDebugProbeBoardInfo( 440 bus=usb_info.bus, 441 port=usb_info.port, 442 serial_port=serial_port, 443 serial_number=usb_info.serial_number, 444 product=usb_info.product, 445 manufacturer=usb_info.manufacturer, 446 ) 447 448 return PicoBoardInfo( 449 bus=usb_info.bus, 450 port=usb_info.port, 451 serial_port=serial_port, 452 product=usb_info.product, 453 manufacturer=usb_info.manufacturer, 454 ) 455 456 457def detect_boards( 458 include_picos: bool = True, include_debug_probes: bool = False 459) -> Sequence[PicoBoardInfo | PicoDebugProbeBoardInfo]: 460 """Detects attached Raspberry Pi Pico boards in USB serial mode. 461 462 Args: 463 include_picos: Whether or not to include detected Raspberry Pi Picos in 464 the list of enumerated devices. 465 include_debug_probes: Whether or not to include detected Raspberry Pi 466 debug probes in the list of enumerated devices. 467 468 Returns: 469 A list of all found boards. 470 """ 471 serial_devices = _detect_pico_serial_ports() 472 pico_usb_info = _detect_pico_usb_info( 473 include_picos=include_picos, 474 include_debug_probes=include_debug_probes, 475 ) 476 boards: list[PicoBoardInfo | PicoDebugProbeBoardInfo] = [] 477 for serial_number, usb_info in pico_usb_info.items(): 478 if not include_debug_probes and usb_info.is_debug_probe: 479 continue 480 481 if not include_picos and not usb_info.is_debug_probe: 482 continue 483 484 serial_port = None 485 if serial_number in serial_devices: 486 serial_port = serial_devices[serial_number].serial_port 487 488 if usb_info.is_debug_probe: 489 boards.append( 490 PicoDebugProbeBoardInfo( 491 bus=usb_info.bus, 492 port=usb_info.port, 493 serial_port=serial_port, 494 serial_number=usb_info.serial_number, 495 product=usb_info.product, 496 manufacturer=usb_info.manufacturer, 497 ) 498 ) 499 elif serial_port or usb_info.in_bootloader_mode: 500 boards.append( 501 PicoBoardInfo( 502 bus=usb_info.bus, 503 port=usb_info.port, 504 serial_port=serial_port, 505 product=usb_info.product, 506 manufacturer=usb_info.manufacturer, 507 ) 508 ) 509 510 return boards 511 512 513def main(): 514 """Detects and then prints all attached Raspberry Pi Picos.""" 515 pw_cli.log.install( 516 level=logging.DEBUG, use_color=True, hide_timestamp=False 517 ) 518 519 boards = detect_boards(include_picos=True, include_debug_probes=True) 520 if not boards: 521 _LOG.info('No attached boards detected') 522 for idx, board in enumerate(boards): 523 _LOG.info('Board %d:', idx) 524 _LOG.info(' %s', board) 525 526 527if __name__ == '__main__': 528 main() 529