• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2023 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""This script flashes and runs unit tests on Raspberry Pi Pico boards."""
16
17import argparse
18import logging
19import subprocess
20import sys
21import time
22from pathlib import Path
23
24import serial  # type: ignore
25
26import pw_cli.log
27from pw_unit_test import serial_test_runner
28from pw_unit_test.serial_test_runner import (
29    SerialTestingDevice,
30    DeviceNotFound,
31)
32from rp2040_utils import device_detector
33from rp2040_utils.device_detector import PicoBoardInfo, PicoDebugProbeBoardInfo
34
35
36_LOG = logging.getLogger()
37
38
39def parse_args():
40    """Parses command-line arguments."""
41
42    parser = argparse.ArgumentParser(description=__doc__)
43    parser.add_argument(
44        'binary', type=Path, help='The target test binary to run'
45    )
46    parser.add_argument(
47        '--usb-bus',
48        type=int,
49        help='The bus this Pi Pico is on',
50    )
51    parser.add_argument(
52        '--usb-port',
53        type=str,
54        help=(
55            'The port chain as a colon-separated list of integers of this Pi '
56            'Pico on the specified USB bus (e.g. 1:4:2:2)'
57        ),
58    )
59    parser.add_argument(
60        '-b',
61        '--baud',
62        type=int,
63        default=115200,
64        help='Baud rate to use for serial communication with target device',
65    )
66    parser.add_argument(
67        '--test-timeout',
68        type=float,
69        default=5.0,
70        help='Maximum communication delay in seconds before a '
71        'test is considered unresponsive and aborted',
72    )
73    parser.add_argument(
74        '--debug-probe-only',
75        action='store_true',
76        help='Only run tests on detected Pi Pico debug probes',
77    )
78    parser.add_argument(
79        '--pico-only',
80        action='store_true',
81        help='Only run tests on detected Pi Pico boards',
82    )
83    parser.add_argument(
84        '--verbose',
85        '-v',
86        dest='verbose',
87        action='store_true',
88        help='Output additional logs as the script runs',
89    )
90
91    return parser.parse_args()
92
93
94class PiPicoTestingDevice(SerialTestingDevice):
95    """A SerialTestingDevice implementation for the Pi Pico."""
96
97    def __init__(
98        self,
99        board_info: PicoBoardInfo | PicoDebugProbeBoardInfo,
100        baud_rate=115200,
101    ):
102        self._board_info = board_info
103        self._baud_rate = baud_rate
104
105    @staticmethod
106    def _find_elf(binary: Path) -> Path | None:
107        """Attempt to find and return the path to an ELF file for a binary.
108
109        Args:
110          binary: A relative path to a binary.
111
112        Returns the path to the associated ELF file, or None if none was found.
113        """
114        if binary.suffix == '.elf' or not binary.suffix:
115            return binary
116        choices = (
117            binary.parent / f'{binary.stem}.elf',
118            binary.parent / 'bin' / f'{binary.stem}.elf',
119            binary.parent / 'test' / f'{binary.stem}.elf',
120        )
121        for choice in choices:
122            if choice.is_file():
123                return choice
124
125        _LOG.error(
126            'Cannot find ELF file to use as a token database for binary: %s',
127            binary,
128        )
129        return None
130
131    def load_binary(self, binary: Path) -> bool:
132        """Flash a binary to this device, returning success or failure."""
133        if self._board_info.is_debug_probe():
134            return self.load_debugprobe_binary(binary)
135        return self.load_picotool_binary(binary)
136
137    def load_debugprobe_binary(self, binary: Path) -> bool:
138        """Flash a binary to this device using a debug probe, returning success
139        or failure."""
140        elf_path = self._find_elf(binary)
141        if not elf_path:
142            return False
143
144        if not isinstance(self._board_info, PicoDebugProbeBoardInfo):
145            return False
146
147        # `probe-rs` takes a `--probe` argument of the form:
148        #  <vendor_id>:<product_id>:<serial_number>
149        probe = "{:04x}:{:04x}:{}".format(
150            self._board_info.vendor_id(),
151            self._board_info.device_id(),
152            self._board_info.serial_number,
153        )
154
155        download_cmd = (
156            'probe-rs',
157            'download',
158            '--probe',
159            probe,
160            '--chip',
161            'RP2040',
162            '--speed',
163            '10000',
164            str(elf_path),
165        )
166        _LOG.debug('Flashing ==> %s', ' '.join(download_cmd))
167        process = subprocess.run(
168            download_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
169        )
170        if process.returncode:
171            err = (
172                'Flashing command failed: ' + ' '.join(download_cmd),
173                str(self._board_info),
174                process.stdout.decode('utf-8', errors='replace'),
175            )
176            _LOG.error('\n\n'.join(err))
177            return False
178
179        reset_cmd = (
180            'probe-rs',
181            'reset',
182            '--probe',
183            probe,
184            '--chip',
185            'RP2040',
186        )
187        _LOG.debug('Resetting ==> %s', ' '.join(reset_cmd))
188        process = subprocess.run(
189            reset_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
190        )
191        if process.returncode:
192            err = (
193                'Resetting command failed: ' + ' '.join(reset_cmd),
194                str(self._board_info),
195                process.stdout.decode('utf-8', errors='replace'),
196            )
197            _LOG.error('\n\n'.join(err))
198            return False
199
200        # Give time for the device to reset.  Ideally the common unit test
201        # runner would wait for input but this is not the case.
202        time.sleep(0.5)
203
204        return True
205
206    def load_picotool_binary(self, binary: Path) -> bool:
207        """Flash a binary to this device using picotool, returning success or
208        failure."""
209        cmd = (
210            'picotool',
211            'load',
212            '-x',
213            str(binary),
214            '--bus',
215            str(self._board_info.bus),
216            '--address',
217            str(self._board_info.address()),
218            '-F',
219        )
220        _LOG.debug('Flashing ==> %s', ' '.join(cmd))
221        process = subprocess.run(
222            cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
223        )
224        if process.returncode:
225            err = (
226                'Flashing command failed: ' + ' '.join(cmd),
227                str(self._board_info),
228                process.stdout.decode('utf-8', errors='replace'),
229            )
230            _LOG.error('\n\n'.join(err))
231            return False
232
233        start_time = time.monotonic()
234        load_binary_timeout = 10.0
235        # Wait for serial port to enumerate. This will retry forever.
236        while time.monotonic() - start_time < load_binary_timeout:
237            # If the serial port path isn't known, watch for a newly
238            # enmerated path.
239            if not self.serial_port() or self.serial_port() == 'None':
240                # Wait a bit before checking for a new port.
241                time.sleep(0.3)
242                # Check for updated serial port path.
243                for device in device_detector.detect_boards():
244                    if (
245                        device.bus == self._board_info.bus
246                        and device.port == self._board_info.port
247                    ):
248                        self._board_info.serial_port = device.serial_port
249                        # Serial port found, break out of device for loop.
250                        break
251
252            # Serial port known try to connect to it.
253            if self.serial_port():
254                # Connect to the serial port.
255                try:
256                    serial.Serial(
257                        baudrate=self.baud_rate(), port=self.serial_port()
258                    )
259                    return True
260                except serial.serialutil.SerialException:
261                    # Unable to connect, try again.
262                    _LOG.debug(
263                        'Unable to connect to %s, retrying', self.serial_port()
264                    )
265                    time.sleep(0.1)
266
267        _LOG.error(
268            'Binary flashed but unable to connect to the serial port: %s',
269            self.serial_port(),
270        )
271        return False
272
273    def serial_port(self) -> str:
274        if not self._board_info.serial_port:
275            return 'None'
276        return self._board_info.serial_port
277
278    def baud_rate(self) -> int:
279        return self._baud_rate
280
281
282def _run_test(
283    device: PiPicoTestingDevice, binary: Path, test_timeout: float
284) -> bool:
285    return serial_test_runner.run_device_test(device, binary, test_timeout)
286
287
288def run_device_test(
289    binary: Path,
290    test_timeout: float,
291    baud_rate: int,
292    usb_bus: int,
293    usb_port: str,
294) -> bool:
295    """Flashes, runs, and checks an on-device test binary.
296
297    Returns true on test pass.
298    """
299    board = device_detector.board_from_usb_port(usb_bus, usb_port)
300    return _run_test(
301        PiPicoTestingDevice(board, baud_rate), binary, test_timeout
302    )
303
304
305def detect_and_run_test(
306    binary: Path,
307    test_timeout: float,
308    baud_rate: int,
309    include_picos: bool = True,
310    include_debug_probes: bool = True,
311):
312    _LOG.debug('Attempting to automatically detect dev board')
313    boards = device_detector.detect_boards(
314        include_picos=include_picos,
315        include_debug_probes=include_debug_probes,
316    )
317    if not boards:
318        error = 'Could not find an attached device'
319        _LOG.error(error)
320        raise DeviceNotFound(error)
321    return _run_test(
322        PiPicoTestingDevice(boards[0], baud_rate), binary, test_timeout
323    )
324
325
326def main():
327    """Set up runner, and then flash/run device test."""
328    args = parse_args()
329    test_logfile = args.binary.with_suffix(args.binary.suffix + '.test_log.txt')
330    # Truncate existing logfile.
331    test_logfile.write_text('', encoding='utf-8')
332    pw_cli.log.install(
333        level=logging.DEBUG if args.verbose else logging.INFO,
334        debug_log=test_logfile,
335    )
336    _LOG.debug('Logging results to %s', test_logfile)
337
338    if args.pico_only and args.debug_probe_only:
339        _LOG.critical('Cannot specify both --pico-only and --debug-probe-only')
340        sys.exit(1)
341
342    # For now, require manual configurations to be fully specified.
343    if (args.usb_port is not None or args.usb_bus is not None) and not (
344        args.usb_port is not None and args.usb_bus is not None
345    ):
346        _LOG.critical(
347            'Must specify BOTH --usb-bus and --usb-port when manually '
348            'specifying a device'
349        )
350        sys.exit(1)
351
352    test_passed = False
353    if not args.usb_bus:
354        test_passed = detect_and_run_test(
355            args.binary,
356            args.test_timeout,
357            args.baud,
358            not args.debug_probe_only,
359            not args.pico_only,
360        )
361    else:
362        test_passed = run_device_test(
363            args.binary,
364            args.test_timeout,
365            args.baud,
366            args.usb_bus,
367            args.usb_port,
368        )
369
370        sys.exit(0 if test_passed else 1)
371
372
373if __name__ == '__main__':
374    main()
375