1#!/usr/bin/env python3 2# Copyright 2023 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Detects attached Raspberry Pi Pico boards.""" 16 17from dataclasses import asdict, dataclass 18import logging 19import os 20from pathlib import Path 21import platform 22import shutil 23import subprocess 24import sys 25from typing import Iterable, Optional, Sequence 26 27from ctypes.util import find_library as ctypes_find_library 28import serial.tools.list_ports 29import usb # type: ignore 30from usb.backend import libusb1 # type: ignore 31 32import pw_cli.log 33from pw_cli.env import pigweed_environment 34 35_LOG = logging.getLogger('pi_pico_detector') 36 37# Vendor/device ID to search for in USB devices. 38_RASPBERRY_PI_VENDOR_ID = 0x2E8A 39 40# RaspberryPi Debug probe: https://github.com/raspberrypi/debugprobe 41_DEBUG_PROBE_DEVICE_ID = 0x000C 42 43_PICO_USB_SERIAL_DEVICE_ID = 0x000A 44_PICO_BOOTLOADER_DEVICE_ID = 0x0003 45_PICO_DEVICE_IDS = ( 46 _PICO_USB_SERIAL_DEVICE_ID, 47 _PICO_BOOTLOADER_DEVICE_ID, 48) 49 50_ALL_DEVICE_IDS = ( 51 _DEBUG_PROBE_DEVICE_ID, 52 _PICO_USB_SERIAL_DEVICE_ID, 53 _PICO_BOOTLOADER_DEVICE_ID, 54) 55 56_LIBUSB_CIPD_INSTALL_ENV_VAR = 'PW_PIGWEED_CIPD_INSTALL_DIR' 57_LIBUSB_CIPD_SUBDIR = 'libexec' 58 59if platform.system() == 'Linux': 60 _LIB_SUFFIX = '.so' 61elif platform.system() == 'Darwin': 62 _LIB_SUFFIX = '.dylib' 63elif platform.system() == 'Windows': 64 _LIB_SUFFIX = '.dll' 65else: 66 _LOG.error('Unsupported platform.system(): %s', platform.system()) 67 sys.exit(1) 68 69 70def _custom_find_library(name: str) -> str | None: 71 """Search for shared libraries in non-standard locations.""" 72 search_paths: list[Path] = [] 73 74 # Add to search_paths starting with lowest priority locations. 75 76 if platform.system() == 'Darwin': 77 # libusb from homebrew 78 homebrew_prefix = os.environ.get('HOMEBREW_PREFIX', '') 79 if homebrew_prefix: 80 homebrew_lib = Path(homebrew_prefix) / 'lib' 81 homebrew_lib = homebrew_lib.expanduser().resolve() 82 if homebrew_lib.is_dir(): 83 search_paths.append(homebrew_lib) 84 85 # libusb from pkg-config 86 pkg_config_bin = shutil.which('pkg-config') 87 if pkg_config_bin: 88 # pkg-config often prefixes libraries with 'lib', check both. 89 for pkg_name in [f'lib{name}', name]: 90 pkg_config_command = [pkg_config_bin, '--variable=libdir', pkg_name] 91 process = subprocess.run( 92 pkg_config_command, 93 stdout=subprocess.PIPE, 94 stderr=subprocess.STDOUT, 95 ) 96 if process.returncode == 0: 97 pkg_config_libdir = Path( 98 process.stdout.decode('utf-8', errors='ignore').strip() 99 ) 100 if pkg_config_libdir.is_dir(): 101 search_paths.append(pkg_config_libdir) 102 break 103 104 # libusb provided by CIPD: 105 pw_env = pigweed_environment() 106 if _LIBUSB_CIPD_INSTALL_ENV_VAR in pw_env: 107 cipd_lib = ( 108 Path(getattr(pw_env, _LIBUSB_CIPD_INSTALL_ENV_VAR)) 109 / _LIBUSB_CIPD_SUBDIR 110 ) 111 if cipd_lib.is_dir(): 112 search_paths.append(cipd_lib) 113 114 _LOG.debug('Potential shared library search paths:') 115 for path in search_paths: 116 _LOG.debug(path) 117 118 # Search for shared libraries in search_paths 119 for libdir in reversed(search_paths): 120 lib_results = sorted( 121 str(lib.resolve()) 122 for lib in libdir.iterdir() 123 if name in lib.name and _LIB_SUFFIX in lib.suffixes 124 ) 125 if lib_results: 126 _LOG.debug('Using %s located at: %s', name, lib_results[-1]) 127 # Return the highest lexigraphically sorted lib version 128 return lib_results[-1] 129 130 # Fallback to pyusb default of calling ctypes.util.find_library. 131 return ctypes_find_library(name) 132 133 134def _device_port_path(device: usb.core.Device) -> str: 135 """Returns the chain of ports that represent where a device is attached. 136 137 Example: 138 2:2:1 139 """ 140 return ":".join([str(port) for port in device.port_numbers]) 141 142 143def _libusb_raspberry_pi_devices( 144 bus_filter: int | None, 145 port_filter: str | None, 146 include_picos: bool = True, 147 include_debug_probes: bool = True, 148) -> Iterable[usb.core.Device]: 149 """Finds all Pi Pico-related USB devices.""" 150 devices_to_match: list[int] = [] 151 if include_picos: 152 devices_to_match.extend(_PICO_DEVICE_IDS) 153 if include_debug_probes: 154 devices_to_match.append(_DEBUG_PROBE_DEVICE_ID) 155 156 def _custom_match(d: usb.core.Device): 157 if d.idVendor != _RASPBERRY_PI_VENDOR_ID: 158 return False 159 if d.idProduct not in devices_to_match: 160 return False 161 if bus_filter is not None and d.bus != bus_filter: 162 return False 163 if port_filter is not None and _device_port_path(d) != port_filter: 164 return False 165 166 return True 167 168 return usb.core.find( 169 find_all=True, 170 custom_match=_custom_match, 171 backend=libusb1.get_backend(find_library=_custom_find_library), 172 ) 173 174 175@dataclass 176class PicoBoardInfo: 177 """Information about a connected Pi Pico board. 178 179 NOTE: As a Pico board is flashed and reset, the USB address can change. 180 Also, the bootloader has a different serial number than the regular 181 application firmware. For this reason, this object does NOT cache or store 182 the serial number, address, or USB device ID. 183 """ 184 185 bus: int 186 port: str 187 serial_port: Optional[str] 188 189 def address(self) -> int: 190 """Queries this device for its USB address. 191 192 WARNING: This is not necessarily stable, and may change whenever the 193 board is reset or flashed. 194 """ 195 for device in _libusb_raspberry_pi_devices( 196 bus_filter=self.bus, port_filter=self.port 197 ): 198 if device.idProduct not in _ALL_DEVICE_IDS: 199 _LOG.error( 200 'Unknown device type on bus %s port %s', self.bus, self.port 201 ) 202 if _device_port_path(device) == self.port: 203 return device.address 204 raise ValueError( 205 'No Pico found, it may have been disconnected or flashed with ' 206 'an incompatible application' 207 ) 208 209 @staticmethod 210 def vendor_id() -> int: 211 return _RASPBERRY_PI_VENDOR_ID 212 213 def is_debug_probe(self) -> bool: 214 return isinstance(self, PicoDebugProbeBoardInfo) 215 216 217@dataclass 218class PicoDebugProbeBoardInfo(PicoBoardInfo): 219 """Information about a connected Pi Debug Probe. 220 221 Unlike a Pi Pico, a Pi Debug Probe has a stable serial number and device ID. 222 """ 223 224 serial_number: str 225 226 @staticmethod 227 def device_id() -> int: 228 return _DEBUG_PROBE_DEVICE_ID 229 230 231@dataclass 232class _BoardSerialInfo: 233 """Object that ties a serial number to a serial com port.""" 234 235 serial_port: str 236 serial_number: str 237 238 239@dataclass 240class _BoardUsbInfo: 241 """Object that ties a serial number to other USB device information. 242 243 WARNING: This is private and ephemeral because many of these values are not 244 necessarily stable as a Pico is flashed and reset. Use PicoBoardInfo or 245 PicoDebugProbeBoardInfo for more stable representations of an attached Pico. 246 """ 247 248 serial_number: str 249 bus: int 250 port: str 251 product: str 252 manufacturer: str 253 vendor_id: int 254 product_id: int 255 256 @property 257 def in_bootloader_mode(self) -> bool: 258 return self.product_id == _PICO_BOOTLOADER_DEVICE_ID 259 260 @property 261 def in_usb_serial_mode(self) -> bool: 262 return self.product_id == _PICO_USB_SERIAL_DEVICE_ID 263 264 @property 265 def is_debug_probe(self) -> bool: 266 return self.product_id == _DEBUG_PROBE_DEVICE_ID 267 268 def __repr__(self) -> str: 269 return repr(asdict(self)) 270 271 272def _detect_pico_usb_info( 273 bus_filter: int | None = None, 274 port_filter: str | None = None, 275 include_picos: bool = True, 276 include_debug_probes: bool = True, 277) -> dict[str, _BoardUsbInfo]: 278 """Finds Raspberry Pi Pico devices and retrieves USB info for each one.""" 279 boards: dict[str, _BoardUsbInfo] = {} 280 devices = _libusb_raspberry_pi_devices( 281 bus_filter=bus_filter, 282 port_filter=port_filter, 283 include_picos=include_picos, 284 include_debug_probes=include_debug_probes, 285 ) 286 287 if not devices: 288 return boards 289 290 _LOG.debug('==> Detecting Raspberry Pi devices') 291 for device in devices: 292 try: 293 serial_number = device.serial_number 294 except ValueError as e: 295 _LOG.warning( 296 ' --> A connected device has an inaccessible ' 297 'serial number: %s', 298 e, 299 ) 300 continue 301 302 board_usb_info = _BoardUsbInfo( 303 serial_number=serial_number, 304 bus=device.bus, 305 port=_device_port_path(device), 306 product=device.product, 307 manufacturer=device.manufacturer, 308 vendor_id=device.idVendor, 309 product_id=device.idProduct, 310 ) 311 312 if board_usb_info.in_usb_serial_mode: 313 boards[serial_number] = board_usb_info 314 _LOG.debug( 315 ' --> Found a Pi Pico in USB serial mode: %s', board_usb_info 316 ) 317 318 elif board_usb_info.in_bootloader_mode: 319 boards[serial_number] = board_usb_info 320 _LOG.debug( 321 ' --> Found a Pi Pico in bootloader mode: %s', board_usb_info 322 ) 323 324 elif board_usb_info.is_debug_probe: 325 boards[serial_number] = board_usb_info 326 _LOG.debug( 327 ' --> Found Raspberry Pi debug probe: %s', board_usb_info 328 ) 329 330 else: 331 _LOG.warning( 332 ' --> Found unknown/incompatible Raspberry Pi: %s', 333 board_usb_info, 334 ) 335 336 return boards 337 338 339def _detect_pico_serial_ports() -> dict[str, _BoardSerialInfo]: 340 """Finds the serial com port associated with each Raspberry Pi Pico.""" 341 boards = {} 342 all_devs = serial.tools.list_ports.comports() 343 for dev in all_devs: 344 if dev.vid == _RASPBERRY_PI_VENDOR_ID and ( 345 dev.pid == _PICO_USB_SERIAL_DEVICE_ID 346 or dev.pid == _DEBUG_PROBE_DEVICE_ID 347 ): 348 serial_number = dev.serial_number 349 if serial_number is None: 350 _LOG.error('Found pico with no serial number') 351 continue 352 boards[serial_number] = _BoardSerialInfo( 353 serial_port=dev.device, 354 serial_number=serial_number, 355 ) 356 return boards 357 358 359def board_from_usb_port( 360 bus: int, 361 port: str, 362 include_picos: bool = True, 363 include_debug_probes: bool = True, 364) -> PicoDebugProbeBoardInfo | PicoBoardInfo: 365 """Retrieves board info for the Pico at the specified USB bus and port. 366 367 Args: 368 bus: The USB bus that the requested Pico resides on. 369 port: The chain of ports as a colon separated list of integers (e.g. 370 '1:4:2:2') that the requested Pico resides on. This only performs 371 exact matches. 372 373 Returns: 374 The board at the requested bus/port. 375 """ 376 serial_devices = _detect_pico_serial_ports() 377 pico_usb_info = _detect_pico_usb_info( 378 include_picos=include_picos, 379 include_debug_probes=include_debug_probes, 380 bus_filter=bus, 381 port_filter=port, 382 ) 383 384 if not pico_usb_info: 385 raise ValueError(f'No matching device found on bus {bus} port {port}') 386 387 if len(pico_usb_info) > 1: 388 raise ValueError(f'Multiple Picos found on bus {bus} port {port}') 389 390 usb_info = next(iter(pico_usb_info.values())) 391 392 serial_port = None 393 if usb_info.serial_number in serial_devices: 394 serial_port = serial_devices[usb_info.serial_number].serial_port 395 396 if usb_info.is_debug_probe: 397 return PicoDebugProbeBoardInfo( 398 bus=usb_info.bus, 399 port=usb_info.port, 400 serial_port=serial_port, 401 serial_number=usb_info.serial_number, 402 ) 403 404 return PicoBoardInfo( 405 bus=usb_info.bus, 406 port=usb_info.port, 407 serial_port=serial_port, 408 ) 409 410 411def detect_boards( 412 include_picos: bool = True, include_debug_probes: bool = False 413) -> Sequence[PicoBoardInfo | PicoDebugProbeBoardInfo]: 414 """Detects attached Raspberry Pi Pico boards in USB serial mode. 415 416 Args: 417 include_picos: Whether or not to include detected Raspberry Pi Picos in 418 the list of enumerated devices. 419 include_debug_probes: Whether or not to include detected Raspberry Pi 420 debug probes in the list of enumerated devices. 421 422 Returns: 423 A list of all found boards. 424 """ 425 serial_devices = _detect_pico_serial_ports() 426 pico_usb_info = _detect_pico_usb_info( 427 include_picos=include_picos, 428 include_debug_probes=include_debug_probes, 429 ) 430 boards: list[PicoBoardInfo | PicoDebugProbeBoardInfo] = [] 431 for serial_number, usb_info in pico_usb_info.items(): 432 if not include_debug_probes and usb_info.is_debug_probe: 433 continue 434 435 if not include_picos and not usb_info.is_debug_probe: 436 continue 437 438 serial_port = None 439 if serial_number in serial_devices: 440 serial_port = serial_devices[serial_number].serial_port 441 442 if usb_info.is_debug_probe: 443 boards.append( 444 PicoDebugProbeBoardInfo( 445 bus=usb_info.bus, 446 port=usb_info.port, 447 serial_port=serial_port, 448 serial_number=usb_info.serial_number, 449 ) 450 ) 451 elif serial_port or usb_info.in_bootloader_mode: 452 boards.append( 453 PicoBoardInfo( 454 bus=usb_info.bus, 455 port=usb_info.port, 456 serial_port=serial_port, 457 ) 458 ) 459 460 return boards 461 462 463def main(): 464 """Detects and then prints all attached Raspberry Pi Picos.""" 465 pw_cli.log.install( 466 level=logging.DEBUG, use_color=True, hide_timestamp=False 467 ) 468 469 boards = detect_boards(include_picos=True, include_debug_probes=True) 470 if not boards: 471 _LOG.info('No attached boards detected') 472 for idx, board in enumerate(boards): 473 _LOG.info('Board %d:', idx) 474 _LOG.info(' %s', board) 475 476 477if __name__ == '__main__': 478 main() 479