1#! python 2# 3# This module implements a special URL handler that wraps an other port, 4# print the traffic for debugging purposes. With this, it is possible 5# to debug the serial port traffic on every application that uses 6# serial_for_url. 7# 8# This file is part of pySerial. https://github.com/pyserial/pyserial 9# (C) 2015 Chris Liechti <cliechti@gmx.net> 10# 11# SPDX-License-Identifier: BSD-3-Clause 12# 13# URL format: spy://port[?option[=value][&option[=value]]] 14# options: 15# - dev=X a file or device to write to 16# - color use escape code to colorize output 17# - raw forward raw bytes instead of hexdump 18# 19# example: 20# redirect output to an other terminal window on Posix (Linux): 21# python -m serial.tools.miniterm spy:///dev/ttyUSB0?dev=/dev/pts/14\&color 22 23from __future__ import absolute_import 24 25import logging 26import sys 27import time 28 29import serial 30from serial.serialutil import to_bytes 31 32try: 33 import urlparse 34except ImportError: 35 import urllib.parse as urlparse 36 37 38def sixteen(data): 39 """\ 40 yield tuples of hex and ASCII display in multiples of 16. Includes a 41 space after 8 bytes and (None, None) after 16 bytes and at the end. 42 """ 43 n = 0 44 for b in serial.iterbytes(data): 45 yield ('{:02X} '.format(ord(b)), b.decode('ascii') if b' ' <= b < b'\x7f' else '.') 46 n += 1 47 if n == 8: 48 yield (' ', '') 49 elif n >= 16: 50 yield (None, None) 51 n = 0 52 if n > 0: 53 while n < 16: 54 n += 1 55 if n == 8: 56 yield (' ', '') 57 yield (' ', ' ') 58 yield (None, None) 59 60 61def hexdump(data): 62 """yield lines with hexdump of data""" 63 values = [] 64 ascii = [] 65 offset = 0 66 for h, a in sixteen(data): 67 if h is None: 68 yield (offset, ' '.join([''.join(values), ''.join(ascii)])) 69 del values[:] 70 del ascii[:] 71 offset += 0x10 72 else: 73 values.append(h) 74 ascii.append(a) 75 76 77class FormatRaw(object): 78 """Forward only RX and TX data to output.""" 79 80 def __init__(self, output, color): 81 self.output = output 82 self.color = color 83 self.rx_color = '\x1b[32m' 84 self.tx_color = '\x1b[31m' 85 86 def rx(self, data): 87 """show received data""" 88 if self.color: 89 self.output.write(self.rx_color) 90 self.output.write(data) 91 self.output.flush() 92 93 def tx(self, data): 94 """show transmitted data""" 95 if self.color: 96 self.output.write(self.tx_color) 97 self.output.write(data) 98 self.output.flush() 99 100 def control(self, name, value): 101 """(do not) show control calls""" 102 pass 103 104 105class FormatHexdump(object): 106 """\ 107 Create a hex dump of RX ad TX data, show when control lines are read or 108 written. 109 110 output example:: 111 112 000000.000 Q-RX flushInput 113 000002.469 RTS inactive 114 000002.773 RTS active 115 000003.001 TX 48 45 4C 4C 4F HELLO 116 000003.102 RX 48 45 4C 4C 4F HELLO 117 118 """ 119 120 def __init__(self, output, color): 121 self.start_time = time.time() 122 self.output = output 123 self.color = color 124 self.rx_color = '\x1b[32m' 125 self.tx_color = '\x1b[31m' 126 self.control_color = '\x1b[37m' 127 128 def write_line(self, timestamp, label, value, value2=''): 129 self.output.write('{:010.3f} {:4} {}{}\n'.format(timestamp, label, value, value2)) 130 self.output.flush() 131 132 def rx(self, data): 133 """show received data as hex dump""" 134 if self.color: 135 self.output.write(self.rx_color) 136 if data: 137 for offset, row in hexdump(data): 138 self.write_line(time.time() - self.start_time, 'RX', '{:04X} '.format(offset), row) 139 else: 140 self.write_line(time.time() - self.start_time, 'RX', '<empty>') 141 142 def tx(self, data): 143 """show transmitted data as hex dump""" 144 if self.color: 145 self.output.write(self.tx_color) 146 for offset, row in hexdump(data): 147 self.write_line(time.time() - self.start_time, 'TX', '{:04X} '.format(offset), row) 148 149 def control(self, name, value): 150 """show control calls""" 151 if self.color: 152 self.output.write(self.control_color) 153 self.write_line(time.time() - self.start_time, name, value) 154 155 156class FormatLog(object): 157 """\ 158 Write data to logging module. 159 """ 160 161 def __init__(self, output, color): 162 # output and color is ignored 163 self.log = logging.getLogger(output) 164 165 def rx(self, data): 166 """show received data""" 167 if data: 168 self.log.info('RX {!r}'.format(data)) 169 170 def tx(self, data): 171 """show transmitted data""" 172 self.log.info('TX {!r}'.format(data)) 173 174 def control(self, name, value): 175 """show control calls""" 176 self.log.info('{}: {}'.format(name, value)) 177 178 179class FormatLogHex(FormatLog): 180 """\ 181 Write data to logging module. 182 """ 183 184 def rx(self, data): 185 """show received data""" 186 if data: 187 for offset, row in hexdump(data): 188 self.log.info('RX {}{}'.format('{:04X} '.format(offset), row)) 189 190 def tx(self, data): 191 """show transmitted data""" 192 for offset, row in hexdump(data): 193 self.log.info('TX {}{}'.format('{:04X} '.format(offset), row)) 194 195 196class Serial(serial.Serial): 197 """\ 198 Inherit the native Serial port implementation and wrap all the methods and 199 attributes. 200 """ 201 # pylint: disable=no-member 202 203 def __init__(self, *args, **kwargs): 204 super(Serial, self).__init__(*args, **kwargs) 205 self.formatter = None 206 self.show_all = False 207 208 @serial.Serial.port.setter 209 def port(self, value): 210 if value is not None: 211 serial.Serial.port.__set__(self, self.from_url(value)) 212 213 def from_url(self, url): 214 """extract host and port from an URL string""" 215 parts = urlparse.urlsplit(url) 216 if parts.scheme != 'spy': 217 raise serial.SerialException( 218 'expected a string in the form ' 219 '"spy://port[?option[=value][&option[=value]]]": ' 220 'not starting with spy:// ({!r})'.format(parts.scheme)) 221 # process options now, directly altering self 222 formatter = FormatHexdump 223 color = False 224 output = sys.stderr 225 try: 226 for option, values in urlparse.parse_qs(parts.query, True).items(): 227 if option == 'file': 228 output = open(values[0], 'w') 229 elif option == 'color': 230 color = True 231 elif option == 'raw': 232 formatter = FormatRaw 233 elif option == 'rawlog': 234 formatter = FormatLog 235 output = values[0] if values[0] else 'serial' 236 elif option == 'log': 237 formatter = FormatLogHex 238 output = values[0] if values[0] else 'serial' 239 elif option == 'all': 240 self.show_all = True 241 else: 242 raise ValueError('unknown option: {!r}'.format(option)) 243 except ValueError as e: 244 raise serial.SerialException( 245 'expected a string in the form ' 246 '"spy://port[?option[=value][&option[=value]]]": {}'.format(e)) 247 self.formatter = formatter(output, color) 248 return ''.join([parts.netloc, parts.path]) 249 250 def write(self, tx): 251 tx = to_bytes(tx) 252 self.formatter.tx(tx) 253 return super(Serial, self).write(tx) 254 255 def read(self, size=1): 256 rx = super(Serial, self).read(size) 257 if rx or self.show_all: 258 self.formatter.rx(rx) 259 return rx 260 261 if hasattr(serial.Serial, 'cancel_read'): 262 def cancel_read(self): 263 self.formatter.control('Q-RX', 'cancel_read') 264 super(Serial, self).cancel_read() 265 266 if hasattr(serial.Serial, 'cancel_write'): 267 def cancel_write(self): 268 self.formatter.control('Q-TX', 'cancel_write') 269 super(Serial, self).cancel_write() 270 271 @property 272 def in_waiting(self): 273 n = super(Serial, self).in_waiting 274 if self.show_all: 275 self.formatter.control('Q-RX', 'in_waiting -> {}'.format(n)) 276 return n 277 278 def flush(self): 279 self.formatter.control('Q-TX', 'flush') 280 super(Serial, self).flush() 281 282 def reset_input_buffer(self): 283 self.formatter.control('Q-RX', 'reset_input_buffer') 284 super(Serial, self).reset_input_buffer() 285 286 def reset_output_buffer(self): 287 self.formatter.control('Q-TX', 'reset_output_buffer') 288 super(Serial, self).reset_output_buffer() 289 290 def send_break(self, duration=0.25): 291 self.formatter.control('BRK', 'send_break {}s'.format(duration)) 292 super(Serial, self).send_break(duration) 293 294 @serial.Serial.break_condition.setter 295 def break_condition(self, level): 296 self.formatter.control('BRK', 'active' if level else 'inactive') 297 serial.Serial.break_condition.__set__(self, level) 298 299 @serial.Serial.rts.setter 300 def rts(self, level): 301 self.formatter.control('RTS', 'active' if level else 'inactive') 302 serial.Serial.rts.__set__(self, level) 303 304 @serial.Serial.dtr.setter 305 def dtr(self, level): 306 self.formatter.control('DTR', 'active' if level else 'inactive') 307 serial.Serial.dtr.__set__(self, level) 308 309 @serial.Serial.cts.getter 310 def cts(self): 311 level = super(Serial, self).cts 312 self.formatter.control('CTS', 'active' if level else 'inactive') 313 return level 314 315 @serial.Serial.dsr.getter 316 def dsr(self): 317 level = super(Serial, self).dsr 318 self.formatter.control('DSR', 'active' if level else 'inactive') 319 return level 320 321 @serial.Serial.ri.getter 322 def ri(self): 323 level = super(Serial, self).ri 324 self.formatter.control('RI', 'active' if level else 'inactive') 325 return level 326 327 @serial.Serial.cd.getter 328 def cd(self): 329 level = super(Serial, self).cd 330 self.formatter.control('CD', 'active' if level else 'inactive') 331 return level 332 333# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 334if __name__ == '__main__': 335 ser = Serial(None) 336 ser.port = 'spy:///dev/ttyS0' 337 print(ser) 338