• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2024 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""This script flashes and runs RPC unit tests on Raspberry Pi Pico boards."""
16
17import logging
18import sys
19from pathlib import Path
20
21import serial  # type: ignore
22
23import pw_cli.log
24from pw_hdlc import rpc
25from pw_log.proto import log_pb2
26from pw_rpc.callback_client.errors import RpcTimeout
27from pw_system import device
28from pw_tokenizer import detokenize
29from pw_unit_test_proto import unit_test_pb2
30
31from rp2040_utils import device_detector
32from rp2040_utils.unit_test_runner import PiPicoTestingDevice, parse_args
33
34
35_LOG = logging.getLogger()
36
37
38class PiPicoRpcTestingDevice(PiPicoTestingDevice):
39    """An RPC test runner implementation for the Pi Pico."""
40
41    def run_device_test(self, binary: Path, timeout: float) -> bool:
42        """Run an RPC unit test on this device.
43
44        Returns whether it succeeded.
45        """
46        if not self.load_binary(binary):
47            return False
48        serial_device = serial.Serial(
49            self.serial_port(), self.baud_rate(), timeout=0.1
50        )
51        reader = rpc.SerialReader(serial_device, 8192)
52        elf_path = self._find_elf(binary)
53        if not elf_path:
54            return False
55        with device.Device(
56            channel_id=rpc.DEFAULT_CHANNEL_ID,
57            reader=reader,
58            write=serial_device.write,
59            proto_library=[log_pb2, unit_test_pb2],
60            detokenizer=detokenize.Detokenizer(elf_path),
61        ) as dev:
62            try:
63                test_results = dev.run_tests(timeout)
64                _LOG.info('Test run complete')
65            except RpcTimeout:
66                _LOG.error('Test timed out after %s seconds.', timeout)
67                return False
68            if not test_results.all_tests_passed():
69                return False
70        return True
71
72
73def run_device_test(
74    binary: Path,
75    test_timeout: float,
76    baud_rate: int,
77    usb_bus: int,
78    usb_port: str,
79) -> bool:
80    """Flash, run, and check an on-device test binary.
81
82    Returns true on test pass.
83    """
84    board = device_detector.board_from_usb_port(usb_bus, usb_port)
85    return PiPicoRpcTestingDevice(board, baud_rate).run_device_test(
86        binary, test_timeout
87    )
88
89
90def detect_and_run_test(
91    binary: Path,
92    test_timeout: float,
93    baud_rate: int,
94    include_picos: bool = True,
95    include_debug_probes: bool = True,
96) -> bool:
97    """Detect a dev board and run a test binary on it.
98
99    Returns whether or not the test completed successfully.
100    """
101    _LOG.debug('Attempting to automatically detect dev board')
102    boards = device_detector.detect_boards(
103        include_picos=include_picos,
104        include_debug_probes=include_debug_probes,
105    )
106    if not boards:
107        _LOG.error('Could not find an attached device')
108        return False
109    return PiPicoRpcTestingDevice(boards[0], baud_rate).run_device_test(
110        binary, test_timeout
111    )
112
113
114def main():
115    """Set up runner and then flash/run device test."""
116    args = parse_args()
117    test_logfile = args.binary.with_suffix(args.binary.suffix + '.test_log.txt')
118    # Truncate existing logfile.
119    test_logfile.write_text('', encoding='utf-8')
120    pw_cli.log.install(
121        level=logging.DEBUG if args.verbose else logging.INFO,
122        debug_log=test_logfile,
123    )
124    _LOG.debug('Logging results to %s', test_logfile)
125
126    if args.pico_only and args.debug_probe_only:
127        _LOG.critical('Cannot specify both --pico-only and --debug-probe-only')
128        sys.exit(1)
129
130    # For now, require manual configurations to be fully specified.
131    if (args.usb_port is not None or args.usb_bus is not None) and not (
132        args.usb_port is not None and args.usb_bus is not None
133    ):
134        _LOG.critical(
135            'Must specify BOTH --usb-bus and --usb-port when manually '
136            'specifying a device'
137        )
138        sys.exit(1)
139
140    test_passed = False
141    if not args.usb_bus:
142        test_passed = detect_and_run_test(
143            args.binary,
144            args.test_timeout,
145            args.baud,
146            not args.debug_probe_only,
147            not args.pico_only,
148        )
149    else:
150        test_passed = run_device_test(
151            args.binary,
152            args.test_timeout,
153            args.baud,
154            args.usb_bus,
155            args.usb_port,
156        )
157    sys.exit(0 if test_passed else 1)
158
159
160if __name__ == '__main__':
161    main()
162