1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Console for interacting with devices using HDLC. 15 16To start the console, provide a serial port as the --device argument and paths 17or globs for .proto files that define the RPC services to support: 18 19 python -m pw_hdlc.rpc_console --device /dev/ttyUSB0 sample.proto 20 21This starts an IPython console for communicating with the connected device. A 22few variables are predefined in the interactive console. These include: 23 24 rpcs - used to invoke RPCs 25 device - the serial device used for communication 26 client - the pw_rpc.Client 27 protos - protocol buffer messages indexed by proto package 28 29An example echo RPC command: 30 31 rpcs.pw.rpc.EchoService.Echo(msg="hello!") 32""" 33 34import argparse 35import datetime 36import glob 37from inspect import cleandoc 38import logging 39from pathlib import Path 40import sys 41from types import ModuleType 42from typing import ( 43 Any, 44 Collection, 45 Iterable, 46 Iterator, 47 List, 48 Optional, 49 Union, 50) 51import socket 52 53import serial # type: ignore 54 55import pw_cli.log 56import pw_console.python_logging 57from pw_console import PwConsoleEmbed 58from pw_console.pyserial_wrapper import SerialWithLogging 59from pw_console.plugins.bandwidth_toolbar import BandwidthToolbar 60 61from pw_log.proto import log_pb2 62from pw_rpc.console_tools.console import flattened_rpc_completions 63from pw_system.device import Device 64from pw_tokenizer.detokenize import AutoUpdatingDetokenizer 65 66_LOG = logging.getLogger('tools') 67_DEVICE_LOG = logging.getLogger('rpc_device') 68 69PW_RPC_MAX_PACKET_SIZE = 256 70SOCKET_SERVER = 'localhost' 71SOCKET_PORT = 33000 72MKFIFO_MODE = 0o666 73 74 75def _parse_args(): 76 """Parses and returns the command line arguments.""" 77 parser = argparse.ArgumentParser(description=__doc__) 78 group = parser.add_mutually_exclusive_group(required=True) 79 group.add_argument('-d', '--device', help='the serial port to use') 80 parser.add_argument('-b', 81 '--baudrate', 82 type=int, 83 default=115200, 84 help='the baud rate to use') 85 parser.add_argument( 86 '--serial-debug', 87 action='store_true', 88 help=('Enable debug log tracing of all data passed through' 89 'pyserial read and write.')) 90 parser.add_argument( 91 '-o', 92 '--output', 93 type=argparse.FileType('wb'), 94 default=sys.stdout.buffer, 95 help=('The file to which to write device output (HDLC channel 1); ' 96 'provide - or omit for stdout.')) 97 parser.add_argument('--logfile', help='Console debug log file.') 98 group.add_argument('-s', 99 '--socket-addr', 100 type=str, 101 help='use socket to connect to server, type default for\ 102 localhost:33000, or manually input the server address:port') 103 parser.add_argument("--token-databases", 104 metavar='elf_or_token_database', 105 nargs="+", 106 type=Path, 107 help="Path to tokenizer database csv file(s).") 108 parser.add_argument('--config-file', 109 type=Path, 110 help='Path to a pw_console yaml config file.') 111 parser.add_argument('--proto-globs', 112 nargs='+', 113 help='glob pattern for .proto files') 114 parser.add_argument('-v', 115 '--verbose', 116 action='store_true', 117 help='Enables debug logging when set') 118 return parser.parse_args() 119 120 121def _expand_globs(globs: Iterable[str]) -> Iterator[Path]: 122 for pattern in globs: 123 for file in glob.glob(pattern, recursive=True): 124 yield Path(file) 125 126 127def _start_ipython_terminal(device: Device, 128 serial_debug: bool = False, 129 config_file_path: Optional[Path] = None) -> None: 130 """Starts an interactive IPython terminal with preset variables.""" 131 local_variables = dict( 132 client=device.client, 133 device=device, 134 rpcs=device.rpcs, 135 protos=device.client.protos.packages, 136 # Include the active pane logger for creating logs in the repl. 137 DEVICE_LOG=_DEVICE_LOG, 138 LOG=logging.getLogger(), 139 ) 140 141 welcome_message = cleandoc(""" 142 Welcome to the Pigweed Console! 143 144 Help: Press F1 or click the [Help] menu 145 To move focus: Press Shift-Tab or click on a window 146 147 Example Python commands: 148 149 device.rpcs.pw.rpc.EchoService.Echo(msg='hello!') 150 LOG.warning('Message appears in Host Logs window.') 151 DEVICE_LOG.warning('Message appears in Device Logs window.') 152 """) 153 154 client_info = device.info() 155 completions = flattened_rpc_completions([client_info]) 156 157 log_windows = { 158 'Device Logs': [_DEVICE_LOG], 159 'Host Logs': [logging.getLogger()], 160 } 161 if serial_debug: 162 log_windows['Serial Debug'] = [ 163 logging.getLogger('pw_console.serial_debug_logger') 164 ] 165 166 interactive_console = PwConsoleEmbed( 167 global_vars=local_variables, 168 local_vars=None, 169 loggers=log_windows, 170 repl_startup_message=welcome_message, 171 help_text=__doc__, 172 config_file_path=config_file_path, 173 ) 174 interactive_console.hide_windows('Host Logs') 175 interactive_console.add_sentence_completer(completions) 176 if serial_debug: 177 interactive_console.add_bottom_toolbar(BandwidthToolbar()) 178 179 # Setup Python logger propagation 180 interactive_console.setup_python_logging() 181 182 # Don't send device logs to the root logger. 183 _DEVICE_LOG.propagate = False 184 185 interactive_console.embed() 186 187 188class SocketClientImpl: 189 def __init__(self, config: str): 190 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 191 socket_server = '' 192 socket_port = 0 193 194 if config == 'default': 195 socket_server = SOCKET_SERVER 196 socket_port = SOCKET_PORT 197 else: 198 socket_server, socket_port_str = config.split(':') 199 socket_port = int(socket_port_str) 200 self.socket.connect((socket_server, socket_port)) 201 202 def write(self, data: bytes): 203 self.socket.sendall(data) 204 205 def read(self, num_bytes: int = PW_RPC_MAX_PACKET_SIZE): 206 return self.socket.recv(num_bytes) 207 208 209def console(device: str, 210 baudrate: int, 211 proto_globs: Collection[str], 212 token_databases: Collection[Path], 213 socket_addr: str, 214 logfile: str, 215 output: Any, 216 serial_debug: bool = False, 217 config_file: Optional[Path] = None, 218 verbose: bool = False) -> int: 219 """Starts an interactive RPC console for HDLC.""" 220 # argparse.FileType doesn't correctly handle '-' for binary files. 221 if output is sys.stdout: 222 output = sys.stdout.buffer 223 224 if not logfile: 225 # Create a temp logfile to prevent logs from appearing over stdout. This 226 # would corrupt the prompt toolkit UI. 227 logfile = pw_console.python_logging.create_temp_log_file() 228 229 log_level = logging.DEBUG if verbose else logging.INFO 230 pw_cli.log.install(log_level, True, False, logfile) 231 _DEVICE_LOG.setLevel(log_level) 232 _LOG.setLevel(log_level) 233 234 detokenizer = None 235 if token_databases: 236 detokenizer = AutoUpdatingDetokenizer(*token_databases) 237 detokenizer.show_errors = True 238 239 if not proto_globs: 240 proto_globs = ['**/*.proto'] 241 242 protos: List[Union[ModuleType, Path]] = list(_expand_globs(proto_globs)) 243 244 # Append compiled log.proto library to avoid include errors when manually 245 # provided, and shadowing errors due to ordering when the default global 246 # search path is used. 247 protos.append(log_pb2) 248 249 if not protos: 250 _LOG.critical('No .proto files were found with %s', 251 ', '.join(proto_globs)) 252 _LOG.critical('At least one .proto file is required') 253 return 1 254 255 _LOG.debug('Found %d .proto files found with %s', len(protos), 256 ', '.join(proto_globs)) 257 258 serial_impl = serial.Serial 259 if serial_debug: 260 serial_impl = SerialWithLogging 261 262 timestamp_decoder = None 263 if socket_addr is None: 264 serial_device = serial_impl( 265 device, 266 baudrate, 267 timeout=0, # Non-blocking mode 268 ) 269 read = lambda: serial_device.read(8192) 270 write = serial_device.write 271 272 # Overwrite decoder for serial device. 273 def milliseconds_to_string(timestamp): 274 """Parses milliseconds since boot to a human-readable string.""" 275 return str(datetime.timedelta(seconds=timestamp / 1e3))[:-3] 276 277 timestamp_decoder = milliseconds_to_string 278 else: 279 try: 280 socket_device = SocketClientImpl(socket_addr) 281 read = socket_device.read 282 write = socket_device.write 283 except ValueError: 284 _LOG.exception('Failed to initialize socket at %s', socket_addr) 285 return 1 286 287 device_client = Device(1, 288 read, 289 write, 290 protos, 291 detokenizer, 292 timestamp_decoder=timestamp_decoder, 293 rpc_timeout_s=5) 294 295 _start_ipython_terminal(device_client, serial_debug, config_file) 296 return 0 297 298 299def main() -> int: 300 return console(**vars(_parse_args())) 301 302 303if __name__ == '__main__': 304 sys.exit(main()) 305