1#!/usr/bin/env python3 2# Copyright 2019 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""This script flashes and runs unit tests on stm32f429i-disc1 boards.""" 16 17import argparse 18import logging 19import os 20import subprocess 21import sys 22import threading 23from typing import List 24 25import coloredlogs # type: ignore 26import serial 27from stm32f429i_disc1_utils import stm32f429i_detector 28 29# Path used to access non-python resources in this python module. 30_DIR = os.path.dirname(__file__) 31 32# Path to default openocd configuration file. 33_OPENOCD_CONFIG = os.path.join(_DIR, 'openocd_stm32f4xx.cfg') 34 35# Path to scripts provided by openocd. 36_OPENOCD_SCRIPTS_DIR = os.path.join( 37 os.getenv('PW_PIGWEED_CIPD_INSTALL_DIR', ''), 'share', 'openocd', 'scripts' 38) 39 40_LOG = logging.getLogger('unit_test_runner') 41 42# Verification of test pass/failure depends on these strings. If the formatting 43# or output of the simple_printing_event_handler changes, this may need to be 44# updated. 45_TESTS_STARTING_STRING = b'[==========] Running all tests.' 46_TESTS_DONE_STRING = b'[==========] Done running all tests.' 47_TEST_FAILURE_STRING = b'[ FAILED ]' 48 49# How long to wait for the first byte of a test to be emitted. This is longer 50# than the user-configurable timeout as there's a delay while the device is 51# flashed. 52_FLASH_TIMEOUT = 5.0 53 54 55class TestingFailure(Exception): 56 """A simple exception to be raised when a testing step fails.""" 57 58 59class DeviceNotFound(Exception): 60 """A simple exception to be raised when unable to connect to a device.""" 61 62 63def parse_args(): 64 """Parses command-line arguments.""" 65 66 parser = argparse.ArgumentParser(description=__doc__) 67 parser.add_argument('binary', help='The target test binary to run') 68 parser.add_argument( 69 '--openocd-config', 70 default=_OPENOCD_CONFIG, 71 help='Path to openocd configuration file', 72 ) 73 parser.add_argument( 74 '--stlink-serial', 75 default=None, 76 help='The serial number of the stlink to use when ' 77 'flashing the target device', 78 ) 79 parser.add_argument( 80 '--port', 81 default=None, 82 help='The name of the serial port to connect to when ' 'running tests', 83 ) 84 parser.add_argument( 85 '--baud', 86 type=int, 87 default=115200, 88 help='Target baud rate to use for serial communication' 89 ' with target device', 90 ) 91 parser.add_argument( 92 '--test-timeout', 93 type=float, 94 default=5.0, 95 help='Maximum communication delay in seconds before a ' 96 'test is considered unresponsive and aborted', 97 ) 98 parser.add_argument( 99 '--verbose', 100 '-v', 101 dest='verbose', 102 action="store_true", 103 help='Output additional logs as the script runs', 104 ) 105 106 return parser.parse_args() 107 108 109def log_subprocess_output(level, output): 110 """Logs subprocess output line-by-line.""" 111 112 lines = output.decode('utf-8', errors='replace').splitlines() 113 for line in lines: 114 _LOG.log(level, line) 115 116 117def reset_device(openocd_config, stlink_serial): 118 """Uses openocd to reset the attached device.""" 119 120 # Name/path of openocd. 121 default_flasher = 'openocd' 122 flash_tool = os.getenv('OPENOCD_PATH', default_flasher) 123 124 cmd = [ 125 flash_tool, 126 '-s', 127 _OPENOCD_SCRIPTS_DIR, 128 '-f', 129 openocd_config, 130 '-c', 131 'init', 132 '-c', 133 'reset run', 134 '-c', 135 'exit', 136 ] 137 _LOG.debug('Resetting device') 138 139 env = os.environ.copy() 140 if stlink_serial: 141 env['PW_STLINK_SERIAL'] = stlink_serial 142 143 # Disable GDB port to support multi-device testing. 144 env['PW_GDB_PORT'] = 'disabled' 145 process = subprocess.run( 146 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env 147 ) 148 if process.returncode: 149 log_subprocess_output(logging.ERROR, process.stdout) 150 raise TestingFailure('Failed to reset target device') 151 152 log_subprocess_output(logging.DEBUG, process.stdout) 153 154 _LOG.debug('Successfully reset device') 155 156 157def read_serial(port, baud_rate, test_timeout) -> bytes: 158 """Reads lines from a serial port until a line read times out. 159 160 Returns bytes object containing the read serial data. 161 """ 162 163 serial_data = bytearray() 164 device = serial.Serial( 165 baudrate=baud_rate, port=port, timeout=_FLASH_TIMEOUT 166 ) 167 if not device.is_open: 168 raise TestingFailure('Failed to open device') 169 170 # Flush input buffer and reset the device to begin the test. 171 device.reset_input_buffer() 172 173 # Block and wait for the first byte. 174 serial_data += device.read() 175 if not serial_data: 176 raise TestingFailure('Device not producing output') 177 178 device.timeout = test_timeout 179 180 # Read with a reasonable timeout until we stop getting characters. 181 while True: 182 bytes_read = device.readline() 183 if not bytes_read: 184 break 185 serial_data += bytes_read 186 if serial_data.rfind(_TESTS_DONE_STRING) != -1: 187 # Set to much more aggressive timeout since the last one or two 188 # lines should print out immediately. (one line if all fails or all 189 # passes, two lines if mixed.) 190 device.timeout = 0.01 191 192 # Remove carriage returns. 193 serial_data = serial_data.replace(b'\r', b'') 194 195 # Try to trim captured results to only contain most recent test run. 196 test_start_index = serial_data.rfind(_TESTS_STARTING_STRING) 197 return ( 198 serial_data 199 if test_start_index == -1 200 else serial_data[test_start_index:] 201 ) 202 203 204def flash_device(binary, openocd_config, stlink_serial): 205 """Flash binary to a connected device using the provided configuration.""" 206 207 # Name/path of openocd. 208 default_flasher = 'openocd' 209 flash_tool = os.getenv('OPENOCD_PATH', default_flasher) 210 211 openocd_command = ' '.join(['program', binary, 'reset', 'exit']) 212 cmd = [ 213 flash_tool, 214 '-s', 215 _OPENOCD_SCRIPTS_DIR, 216 '-f', 217 openocd_config, 218 '-c', 219 openocd_command, 220 ] 221 _LOG.info('Flashing firmware to device') 222 223 env = os.environ.copy() 224 if stlink_serial: 225 env['PW_STLINK_SERIAL'] = stlink_serial 226 227 # Disable GDB port to support multi-device testing. 228 env['PW_GDB_PORT'] = 'disabled' 229 process = subprocess.run( 230 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env 231 ) 232 if process.returncode: 233 log_subprocess_output(logging.ERROR, process.stdout) 234 raise TestingFailure('Failed to flash target device') 235 236 log_subprocess_output(logging.DEBUG, process.stdout) 237 238 _LOG.debug('Successfully flashed firmware to device') 239 240 241def handle_test_results(test_output): 242 """Parses test output to determine whether tests passed or failed.""" 243 244 if test_output.find(_TESTS_STARTING_STRING) == -1: 245 raise TestingFailure('Failed to find test start') 246 247 if test_output.rfind(_TESTS_DONE_STRING) == -1: 248 log_subprocess_output(logging.INFO, test_output) 249 raise TestingFailure('Tests did not complete') 250 251 if test_output.rfind(_TEST_FAILURE_STRING) != -1: 252 log_subprocess_output(logging.INFO, test_output) 253 raise TestingFailure('Test suite had one or more failures') 254 255 log_subprocess_output(logging.DEBUG, test_output) 256 257 _LOG.info('Test passed!') 258 259 260def _threaded_test_reader(dest, port, baud_rate, test_timeout): 261 """Parses test output to the mutable "dest" passed to this function.""" 262 dest.append(read_serial(port, baud_rate, test_timeout)) 263 264 265def run_device_test( 266 binary, test_timeout, openocd_config, baud, stlink_serial=None, port=None 267) -> bool: 268 """Flashes, runs, and checks an on-device test binary. 269 270 Returns true on test pass. 271 """ 272 273 if stlink_serial is None and port is None: 274 _LOG.debug('Attempting to automatically detect dev board') 275 boards = stm32f429i_detector.detect_boards() 276 if not boards: 277 error = 'Could not find an attached device' 278 _LOG.error(error) 279 raise DeviceNotFound(error) 280 stlink_serial = boards[0].serial_number 281 port = boards[0].dev_name 282 283 _LOG.debug('Launching test binary %s', binary) 284 try: 285 # Begin capturing test output via another thread BEFORE flashing the 286 # device since the test will automatically run after the image is 287 # flashed. This reduces flake since there isn't a need to time a reset 288 # correctly relative to the start of capturing device output. 289 result: List[bytes] = [] 290 threaded_reader_args = (result, port, baud, test_timeout) 291 read_thread = threading.Thread( 292 target=_threaded_test_reader, args=threaded_reader_args 293 ) 294 read_thread.start() 295 _LOG.info('Running test') 296 flash_device(binary, openocd_config, stlink_serial) 297 read_thread.join() 298 if result: 299 handle_test_results(result[0]) 300 except TestingFailure as err: 301 _LOG.error(err) 302 return False 303 304 return True 305 306 307def main(): 308 """Set up runner, and then flash/run device test.""" 309 args = parse_args() 310 311 # Try to use pw_cli logs, else default to something reasonable. 312 try: 313 import pw_cli.log # pylint: disable=import-outside-toplevel 314 315 log_level = logging.DEBUG if args.verbose else logging.INFO 316 pw_cli.log.install(level=log_level) 317 except ImportError: 318 coloredlogs.install( 319 level='DEBUG' if args.verbose else 'INFO', 320 level_styles={'debug': {'color': 244}, 'error': {'color': 'red'}}, 321 fmt='%(asctime)s %(levelname)s | %(message)s', 322 ) 323 324 if run_device_test( 325 args.binary, 326 args.test_timeout, 327 args.openocd_config, 328 args.baud, 329 args.stlink_serial, 330 args.port, 331 ): 332 sys.exit(0) 333 else: 334 sys.exit(1) 335 336 337if __name__ == '__main__': 338 main() 339