• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2019 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""This script flashes and runs unit tests on stm32f429i-disc1 boards."""
16
17import argparse
18import logging
19import os
20import subprocess
21import sys
22import threading
23from typing import List
24
25import coloredlogs  # type: ignore
26import serial
27from stm32f429i_disc1_utils import stm32f429i_detector
28
29# Path used to access non-python resources in this python module.
30_DIR = os.path.dirname(__file__)
31
32# Path to default openocd configuration file.
33_OPENOCD_CONFIG = os.path.join(_DIR, 'openocd_stm32f4xx.cfg')
34
35# Path to scripts provided by openocd.
36_OPENOCD_SCRIPTS_DIR = os.path.join(
37    os.getenv('PW_PIGWEED_CIPD_INSTALL_DIR', ''), 'share', 'openocd', 'scripts'
38)
39
40_LOG = logging.getLogger('unit_test_runner')
41
42# Verification of test pass/failure depends on these strings. If the formatting
43# or output of the simple_printing_event_handler changes, this may need to be
44# updated.
45_TESTS_STARTING_STRING = b'[==========] Running all tests.'
46_TESTS_DONE_STRING = b'[==========] Done running all tests.'
47_TEST_FAILURE_STRING = b'[  FAILED  ]'
48
49# How long to wait for the first byte of a test to be emitted. This is longer
50# than the user-configurable timeout as there's a delay while the device is
51# flashed.
52_FLASH_TIMEOUT = 5.0
53
54
55class TestingFailure(Exception):
56    """A simple exception to be raised when a testing step fails."""
57
58
59class DeviceNotFound(Exception):
60    """A simple exception to be raised when unable to connect to a device."""
61
62
63def parse_args():
64    """Parses command-line arguments."""
65
66    parser = argparse.ArgumentParser(description=__doc__)
67    parser.add_argument('binary', help='The target test binary to run')
68    parser.add_argument(
69        '--openocd-config',
70        default=_OPENOCD_CONFIG,
71        help='Path to openocd configuration file',
72    )
73    parser.add_argument(
74        '--stlink-serial',
75        default=None,
76        help='The serial number of the stlink to use when '
77        'flashing the target device',
78    )
79    parser.add_argument(
80        '--port',
81        default=None,
82        help='The name of the serial port to connect to when ' 'running tests',
83    )
84    parser.add_argument(
85        '--baud',
86        type=int,
87        default=115200,
88        help='Target baud rate to use for serial communication'
89        ' with target device',
90    )
91    parser.add_argument(
92        '--test-timeout',
93        type=float,
94        default=5.0,
95        help='Maximum communication delay in seconds before a '
96        'test is considered unresponsive and aborted',
97    )
98    parser.add_argument(
99        '--verbose',
100        '-v',
101        dest='verbose',
102        action="store_true",
103        help='Output additional logs as the script runs',
104    )
105
106    return parser.parse_args()
107
108
109def log_subprocess_output(level, output):
110    """Logs subprocess output line-by-line."""
111
112    lines = output.decode('utf-8', errors='replace').splitlines()
113    for line in lines:
114        _LOG.log(level, line)
115
116
117def reset_device(openocd_config, stlink_serial):
118    """Uses openocd to reset the attached device."""
119
120    # Name/path of openocd.
121    default_flasher = 'openocd'
122    flash_tool = os.getenv('OPENOCD_PATH', default_flasher)
123
124    cmd = [
125        flash_tool,
126        '-s',
127        _OPENOCD_SCRIPTS_DIR,
128        '-f',
129        openocd_config,
130        '-c',
131        'init',
132        '-c',
133        'reset run',
134        '-c',
135        'exit',
136    ]
137    _LOG.debug('Resetting device')
138
139    env = os.environ.copy()
140    if stlink_serial:
141        env['PW_STLINK_SERIAL'] = stlink_serial
142
143    # Disable GDB port to support multi-device testing.
144    env['PW_GDB_PORT'] = 'disabled'
145    process = subprocess.run(
146        cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env
147    )
148    if process.returncode:
149        log_subprocess_output(logging.ERROR, process.stdout)
150        raise TestingFailure('Failed to reset target device')
151
152    log_subprocess_output(logging.DEBUG, process.stdout)
153
154    _LOG.debug('Successfully reset device')
155
156
157def read_serial(port, baud_rate, test_timeout) -> bytes:
158    """Reads lines from a serial port until a line read times out.
159
160    Returns bytes object containing the read serial data.
161    """
162
163    serial_data = bytearray()
164    device = serial.Serial(
165        baudrate=baud_rate, port=port, timeout=_FLASH_TIMEOUT
166    )
167    if not device.is_open:
168        raise TestingFailure('Failed to open device')
169
170    # Flush input buffer and reset the device to begin the test.
171    device.reset_input_buffer()
172
173    # Block and wait for the first byte.
174    serial_data += device.read()
175    if not serial_data:
176        raise TestingFailure('Device not producing output')
177
178    device.timeout = test_timeout
179
180    # Read with a reasonable timeout until we stop getting characters.
181    while True:
182        bytes_read = device.readline()
183        if not bytes_read:
184            break
185        serial_data += bytes_read
186        if serial_data.rfind(_TESTS_DONE_STRING) != -1:
187            # Set to much more aggressive timeout since the last one or two
188            # lines should print out immediately. (one line if all fails or all
189            # passes, two lines if mixed.)
190            device.timeout = 0.01
191
192    # Remove carriage returns.
193    serial_data = serial_data.replace(b'\r', b'')
194
195    # Try to trim captured results to only contain most recent test run.
196    test_start_index = serial_data.rfind(_TESTS_STARTING_STRING)
197    return (
198        serial_data
199        if test_start_index == -1
200        else serial_data[test_start_index:]
201    )
202
203
204def flash_device(binary, openocd_config, stlink_serial):
205    """Flash binary to a connected device using the provided configuration."""
206
207    # Name/path of openocd.
208    default_flasher = 'openocd'
209    flash_tool = os.getenv('OPENOCD_PATH', default_flasher)
210
211    openocd_command = ' '.join(['program', binary, 'reset', 'exit'])
212    cmd = [
213        flash_tool,
214        '-s',
215        _OPENOCD_SCRIPTS_DIR,
216        '-f',
217        openocd_config,
218        '-c',
219        openocd_command,
220    ]
221    _LOG.info('Flashing firmware to device')
222
223    env = os.environ.copy()
224    if stlink_serial:
225        env['PW_STLINK_SERIAL'] = stlink_serial
226
227    # Disable GDB port to support multi-device testing.
228    env['PW_GDB_PORT'] = 'disabled'
229    process = subprocess.run(
230        cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env
231    )
232    if process.returncode:
233        log_subprocess_output(logging.ERROR, process.stdout)
234        raise TestingFailure('Failed to flash target device')
235
236    log_subprocess_output(logging.DEBUG, process.stdout)
237
238    _LOG.debug('Successfully flashed firmware to device')
239
240
241def handle_test_results(test_output):
242    """Parses test output to determine whether tests passed or failed."""
243
244    if test_output.find(_TESTS_STARTING_STRING) == -1:
245        raise TestingFailure('Failed to find test start')
246
247    if test_output.rfind(_TESTS_DONE_STRING) == -1:
248        log_subprocess_output(logging.INFO, test_output)
249        raise TestingFailure('Tests did not complete')
250
251    if test_output.rfind(_TEST_FAILURE_STRING) != -1:
252        log_subprocess_output(logging.INFO, test_output)
253        raise TestingFailure('Test suite had one or more failures')
254
255    log_subprocess_output(logging.DEBUG, test_output)
256
257    _LOG.info('Test passed!')
258
259
260def _threaded_test_reader(dest, port, baud_rate, test_timeout):
261    """Parses test output to the mutable "dest" passed to this function."""
262    dest.append(read_serial(port, baud_rate, test_timeout))
263
264
265def run_device_test(
266    binary, test_timeout, openocd_config, baud, stlink_serial=None, port=None
267) -> bool:
268    """Flashes, runs, and checks an on-device test binary.
269
270    Returns true on test pass.
271    """
272
273    if stlink_serial is None and port is None:
274        _LOG.debug('Attempting to automatically detect dev board')
275        boards = stm32f429i_detector.detect_boards()
276        if not boards:
277            error = 'Could not find an attached device'
278            _LOG.error(error)
279            raise DeviceNotFound(error)
280        stlink_serial = boards[0].serial_number
281        port = boards[0].dev_name
282
283    _LOG.debug('Launching test binary %s', binary)
284    try:
285        # Begin capturing test output via another thread BEFORE flashing the
286        # device since the test will automatically run after the image is
287        # flashed. This reduces flake since there isn't a need to time a reset
288        # correctly relative to the start of capturing device output.
289        result: List[bytes] = []
290        threaded_reader_args = (result, port, baud, test_timeout)
291        read_thread = threading.Thread(
292            target=_threaded_test_reader, args=threaded_reader_args
293        )
294        read_thread.start()
295        _LOG.info('Running test')
296        flash_device(binary, openocd_config, stlink_serial)
297        read_thread.join()
298        if result:
299            handle_test_results(result[0])
300    except TestingFailure as err:
301        _LOG.error(err)
302        return False
303
304    return True
305
306
307def main():
308    """Set up runner, and then flash/run device test."""
309    args = parse_args()
310
311    # Try to use pw_cli logs, else default to something reasonable.
312    try:
313        import pw_cli.log  # pylint: disable=import-outside-toplevel
314
315        log_level = logging.DEBUG if args.verbose else logging.INFO
316        pw_cli.log.install(level=log_level)
317    except ImportError:
318        coloredlogs.install(
319            level='DEBUG' if args.verbose else 'INFO',
320            level_styles={'debug': {'color': 244}, 'error': {'color': 'red'}},
321            fmt='%(asctime)s %(levelname)s | %(message)s',
322        )
323
324    if run_device_test(
325        args.binary,
326        args.test_timeout,
327        args.openocd_config,
328        args.baud,
329        args.stlink_serial,
330        args.port,
331    ):
332        sys.exit(0)
333    else:
334        sys.exit(1)
335
336
337if __name__ == '__main__':
338    main()
339