• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#! python
2#
3# This module implements a special URL handler that wraps an other port,
4# print the traffic for debugging purposes. With this, it is possible
5# to debug the serial port traffic on every application that uses
6# serial_for_url.
7#
8# This file is part of pySerial. https://github.com/pyserial/pyserial
9# (C) 2015 Chris Liechti <cliechti@gmx.net>
10#
11# SPDX-License-Identifier:    BSD-3-Clause
12#
13# URL format:    spy://port[?option[=value][&option[=value]]]
14# options:
15# - dev=X   a file or device to write to
16# - color   use escape code to colorize output
17# - raw     forward raw bytes instead of hexdump
18#
19# example:
20#   redirect output to an other terminal window on Posix (Linux):
21#   python -m serial.tools.miniterm spy:///dev/ttyUSB0?dev=/dev/pts/14\&color
22
23from __future__ import absolute_import
24
25import logging
26import sys
27import time
28
29import serial
30from serial.serialutil import  to_bytes
31
32try:
33    import urlparse
34except ImportError:
35    import urllib.parse as urlparse
36
37
38def sixteen(data):
39    """\
40    yield tuples of hex and ASCII display in multiples of 16. Includes a
41    space after 8 bytes and (None, None) after 16 bytes and at the end.
42    """
43    n = 0
44    for b in serial.iterbytes(data):
45        yield ('{:02X} '.format(ord(b)), b.decode('ascii') if b' ' <= b < b'\x7f' else '.')
46        n += 1
47        if n == 8:
48            yield (' ', '')
49        elif n >= 16:
50            yield (None, None)
51            n = 0
52    if n > 0:
53        while n < 16:
54            n += 1
55            if n == 8:
56                yield (' ', '')
57            yield ('   ', ' ')
58        yield (None, None)
59
60
61def hexdump(data):
62    """yield lines with hexdump of data"""
63    values = []
64    ascii = []
65    offset = 0
66    for h, a in sixteen(data):
67        if h is None:
68            yield (offset, ' '.join([''.join(values), ''.join(ascii)]))
69            del values[:]
70            del ascii[:]
71            offset += 0x10
72        else:
73            values.append(h)
74            ascii.append(a)
75
76
77class FormatRaw(object):
78    """Forward only RX and TX data to output."""
79
80    def __init__(self, output, color):
81        self.output = output
82        self.color = color
83        self.rx_color = '\x1b[32m'
84        self.tx_color = '\x1b[31m'
85
86    def rx(self, data):
87        """show received data"""
88        if self.color:
89            self.output.write(self.rx_color)
90        self.output.write(data)
91        self.output.flush()
92
93    def tx(self, data):
94        """show transmitted data"""
95        if self.color:
96            self.output.write(self.tx_color)
97        self.output.write(data)
98        self.output.flush()
99
100    def control(self, name, value):
101        """(do not) show control calls"""
102        pass
103
104
105class FormatHexdump(object):
106    """\
107    Create a hex dump of RX ad TX data, show when control lines are read or
108    written.
109
110    output example::
111
112        000000.000 Q-RX flushInput
113        000002.469 RTS  inactive
114        000002.773 RTS  active
115        000003.001 TX   48 45 4C 4C 4F                                    HELLO
116        000003.102 RX   48 45 4C 4C 4F                                    HELLO
117
118    """
119
120    def __init__(self, output, color):
121        self.start_time = time.time()
122        self.output = output
123        self.color = color
124        self.rx_color = '\x1b[32m'
125        self.tx_color = '\x1b[31m'
126        self.control_color = '\x1b[37m'
127
128    def write_line(self, timestamp, label, value, value2=''):
129        self.output.write('{:010.3f} {:4} {}{}\n'.format(timestamp, label, value, value2))
130        self.output.flush()
131
132    def rx(self, data):
133        """show received data as hex dump"""
134        if self.color:
135            self.output.write(self.rx_color)
136        if data:
137            for offset, row in hexdump(data):
138                self.write_line(time.time() - self.start_time, 'RX', '{:04X}  '.format(offset), row)
139        else:
140            self.write_line(time.time() - self.start_time, 'RX', '<empty>')
141
142    def tx(self, data):
143        """show transmitted data as hex dump"""
144        if self.color:
145            self.output.write(self.tx_color)
146        for offset, row in hexdump(data):
147            self.write_line(time.time() - self.start_time, 'TX', '{:04X}  '.format(offset), row)
148
149    def control(self, name, value):
150        """show control calls"""
151        if self.color:
152            self.output.write(self.control_color)
153        self.write_line(time.time() - self.start_time, name, value)
154
155
156class FormatLog(object):
157    """\
158    Write data to logging module.
159    """
160
161    def __init__(self, output, color):
162        # output and color is ignored
163        self.log = logging.getLogger(output)
164
165    def rx(self, data):
166        """show received data"""
167        if data:
168            self.log.info('RX {!r}'.format(data))
169
170    def tx(self, data):
171        """show transmitted data"""
172        self.log.info('TX {!r}'.format(data))
173
174    def control(self, name, value):
175        """show control calls"""
176        self.log.info('{}: {}'.format(name, value))
177
178
179class FormatLogHex(FormatLog):
180    """\
181    Write data to logging module.
182    """
183
184    def rx(self, data):
185        """show received data"""
186        if data:
187            for offset, row in hexdump(data):
188                self.log.info('RX {}{}'.format('{:04X}  '.format(offset), row))
189
190    def tx(self, data):
191        """show transmitted data"""
192        for offset, row in hexdump(data):
193            self.log.info('TX {}{}'.format('{:04X}  '.format(offset), row))
194
195
196class Serial(serial.Serial):
197    """\
198    Inherit the native Serial port implementation and wrap all the methods and
199    attributes.
200    """
201    # pylint: disable=no-member
202
203    def __init__(self, *args, **kwargs):
204        super(Serial, self).__init__(*args, **kwargs)
205        self.formatter = None
206        self.show_all = False
207
208    @serial.Serial.port.setter
209    def port(self, value):
210        if value is not None:
211            serial.Serial.port.__set__(self, self.from_url(value))
212
213    def from_url(self, url):
214        """extract host and port from an URL string"""
215        parts = urlparse.urlsplit(url)
216        if parts.scheme != 'spy':
217            raise serial.SerialException(
218                'expected a string in the form '
219                '"spy://port[?option[=value][&option[=value]]]": '
220                'not starting with spy:// ({!r})'.format(parts.scheme))
221        # process options now, directly altering self
222        formatter = FormatHexdump
223        color = False
224        output = sys.stderr
225        try:
226            for option, values in urlparse.parse_qs(parts.query, True).items():
227                if option == 'file':
228                    output = open(values[0], 'w')
229                elif option == 'color':
230                    color = True
231                elif option == 'raw':
232                    formatter = FormatRaw
233                elif option == 'rawlog':
234                    formatter = FormatLog
235                    output = values[0] if values[0] else 'serial'
236                elif option == 'log':
237                    formatter = FormatLogHex
238                    output = values[0] if values[0] else 'serial'
239                elif option == 'all':
240                    self.show_all = True
241                else:
242                    raise ValueError('unknown option: {!r}'.format(option))
243        except ValueError as e:
244            raise serial.SerialException(
245                'expected a string in the form '
246                '"spy://port[?option[=value][&option[=value]]]": {}'.format(e))
247        self.formatter = formatter(output, color)
248        return ''.join([parts.netloc, parts.path])
249
250    def write(self, tx):
251        tx = to_bytes(tx)
252        self.formatter.tx(tx)
253        return super(Serial, self).write(tx)
254
255    def read(self, size=1):
256        rx = super(Serial, self).read(size)
257        if rx or self.show_all:
258            self.formatter.rx(rx)
259        return rx
260
261    if hasattr(serial.Serial, 'cancel_read'):
262        def cancel_read(self):
263            self.formatter.control('Q-RX', 'cancel_read')
264            super(Serial, self).cancel_read()
265
266    if hasattr(serial.Serial, 'cancel_write'):
267        def cancel_write(self):
268            self.formatter.control('Q-TX', 'cancel_write')
269            super(Serial, self).cancel_write()
270
271    @property
272    def in_waiting(self):
273        n = super(Serial, self).in_waiting
274        if self.show_all:
275            self.formatter.control('Q-RX', 'in_waiting -> {}'.format(n))
276        return n
277
278    def flush(self):
279        self.formatter.control('Q-TX', 'flush')
280        super(Serial, self).flush()
281
282    def reset_input_buffer(self):
283        self.formatter.control('Q-RX', 'reset_input_buffer')
284        super(Serial, self).reset_input_buffer()
285
286    def reset_output_buffer(self):
287        self.formatter.control('Q-TX', 'reset_output_buffer')
288        super(Serial, self).reset_output_buffer()
289
290    def send_break(self, duration=0.25):
291        self.formatter.control('BRK', 'send_break {}s'.format(duration))
292        super(Serial, self).send_break(duration)
293
294    @serial.Serial.break_condition.setter
295    def break_condition(self, level):
296        self.formatter.control('BRK', 'active' if level else 'inactive')
297        serial.Serial.break_condition.__set__(self, level)
298
299    @serial.Serial.rts.setter
300    def rts(self, level):
301        self.formatter.control('RTS', 'active' if level else 'inactive')
302        serial.Serial.rts.__set__(self, level)
303
304    @serial.Serial.dtr.setter
305    def dtr(self, level):
306        self.formatter.control('DTR', 'active' if level else 'inactive')
307        serial.Serial.dtr.__set__(self, level)
308
309    @serial.Serial.cts.getter
310    def cts(self):
311        level = super(Serial, self).cts
312        self.formatter.control('CTS', 'active' if level else 'inactive')
313        return level
314
315    @serial.Serial.dsr.getter
316    def dsr(self):
317        level = super(Serial, self).dsr
318        self.formatter.control('DSR', 'active' if level else 'inactive')
319        return level
320
321    @serial.Serial.ri.getter
322    def ri(self):
323        level = super(Serial, self).ri
324        self.formatter.control('RI', 'active' if level else 'inactive')
325        return level
326
327    @serial.Serial.cd.getter
328    def cd(self):
329        level = super(Serial, self).cd
330        self.formatter.control('CD', 'active' if level else 'inactive')
331        return level
332
333# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
334if __name__ == '__main__':
335    ser = Serial(None)
336    ser.port = 'spy:///dev/ttyS0'
337    print(ser)
338