• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Working with threading and pySerial
4#
5# This file is part of pySerial. https://github.com/pyserial/pyserial
6# (C) 2015-2016 Chris Liechti <cliechti@gmx.net>
7#
8# SPDX-License-Identifier:    BSD-3-Clause
9"""\
10Support threading with serial ports.
11"""
12from __future__ import absolute_import
13
14import serial
15import threading
16
17
18class Protocol(object):
19    """\
20    Protocol as used by the ReaderThread. This base class provides empty
21    implementations of all methods.
22    """
23
24    def connection_made(self, transport):
25        """Called when reader thread is started"""
26
27    def data_received(self, data):
28        """Called with snippets received from the serial port"""
29
30    def connection_lost(self, exc):
31        """\
32        Called when the serial port is closed or the reader loop terminated
33        otherwise.
34        """
35        if isinstance(exc, Exception):
36            raise exc
37
38
39class Packetizer(Protocol):
40    """
41    Read binary packets from serial port. Packets are expected to be terminated
42    with a TERMINATOR byte (null byte by default).
43
44    The class also keeps track of the transport.
45    """
46
47    TERMINATOR = b'\0'
48
49    def __init__(self):
50        self.buffer = bytearray()
51        self.transport = None
52
53    def connection_made(self, transport):
54        """Store transport"""
55        self.transport = transport
56
57    def connection_lost(self, exc):
58        """Forget transport"""
59        self.transport = None
60        super(Packetizer, self).connection_lost(exc)
61
62    def data_received(self, data):
63        """Buffer received data, find TERMINATOR, call handle_packet"""
64        self.buffer.extend(data)
65        while self.TERMINATOR in self.buffer:
66            packet, self.buffer = self.buffer.split(self.TERMINATOR, 1)
67            self.handle_packet(packet)
68
69    def handle_packet(self, packet):
70        """Process packets - to be overridden by subclassing"""
71        raise NotImplementedError('please implement functionality in handle_packet')
72
73
74class FramedPacket(Protocol):
75    """
76    Read binary packets. Packets are expected to have a start and stop marker.
77
78    The class also keeps track of the transport.
79    """
80
81    START = b'('
82    STOP = b')'
83
84    def __init__(self):
85        self.packet = bytearray()
86        self.in_packet = False
87        self.transport = None
88
89    def connection_made(self, transport):
90        """Store transport"""
91        self.transport = transport
92
93    def connection_lost(self, exc):
94        """Forget transport"""
95        self.transport = None
96        self.in_packet = False
97        del self.packet[:]
98        super(FramedPacket, self).connection_lost(exc)
99
100    def data_received(self, data):
101        """Find data enclosed in START/STOP, call handle_packet"""
102        for byte in serial.iterbytes(data):
103            if byte == self.START:
104                self.in_packet = True
105            elif byte == self.STOP:
106                self.in_packet = False
107                self.handle_packet(bytes(self.packet)) # make read-only copy
108                del self.packet[:]
109            elif self.in_packet:
110                self.packet.extend(byte)
111            else:
112                self.handle_out_of_packet_data(byte)
113
114    def handle_packet(self, packet):
115        """Process packets - to be overridden by subclassing"""
116        raise NotImplementedError('please implement functionality in handle_packet')
117
118    def handle_out_of_packet_data(self, data):
119        """Process data that is received outside of packets"""
120        pass
121
122
123class LineReader(Packetizer):
124    """
125    Read and write (Unicode) lines from/to serial port.
126    The encoding is applied.
127    """
128
129    TERMINATOR = b'\r\n'
130    ENCODING = 'utf-8'
131    UNICODE_HANDLING = 'replace'
132
133    def handle_packet(self, packet):
134        self.handle_line(packet.decode(self.ENCODING, self.UNICODE_HANDLING))
135
136    def handle_line(self, line):
137        """Process one line - to be overridden by subclassing"""
138        raise NotImplementedError('please implement functionality in handle_line')
139
140    def write_line(self, text):
141        """
142        Write text to the transport. ``text`` is a Unicode string and the encoding
143        is applied before sending ans also the newline is append.
144        """
145        # + is not the best choice but bytes does not support % or .format in py3 and we want a single write call
146        self.transport.write(text.encode(self.ENCODING, self.UNICODE_HANDLING) + self.TERMINATOR)
147
148
149class ReaderThread(threading.Thread):
150    """\
151    Implement a serial port read loop and dispatch to a Protocol instance (like
152    the asyncio.Protocol) but do it with threads.
153
154    Calls to close() will close the serial port but it is also possible to just
155    stop() this thread and continue the serial port instance otherwise.
156    """
157
158    def __init__(self, serial_instance, protocol_factory):
159        """\
160        Initialize thread.
161
162        Note that the serial_instance' timeout is set to one second!
163        Other settings are not changed.
164        """
165        super(ReaderThread, self).__init__()
166        self.daemon = True
167        self.serial = serial_instance
168        self.protocol_factory = protocol_factory
169        self.alive = True
170        self._lock = threading.Lock()
171        self._connection_made = threading.Event()
172        self.protocol = None
173
174    def stop(self):
175        """Stop the reader thread"""
176        self.alive = False
177        if hasattr(self.serial, 'cancel_read'):
178            self.serial.cancel_read()
179        self.join(2)
180
181    def run(self):
182        """Reader loop"""
183        if not hasattr(self.serial, 'cancel_read'):
184            self.serial.timeout = 1
185        self.protocol = self.protocol_factory()
186        try:
187            self.protocol.connection_made(self)
188        except Exception as e:
189            self.alive = False
190            self.protocol.connection_lost(e)
191            self._connection_made.set()
192            return
193        error = None
194        self._connection_made.set()
195        while self.alive and self.serial.is_open:
196            try:
197                # read all that is there or wait for one byte (blocking)
198                data = self.serial.read(self.serial.in_waiting or 1)
199            except serial.SerialException as e:
200                # probably some I/O problem such as disconnected USB serial
201                # adapters -> exit
202                error = e
203                break
204            else:
205                if data:
206                    # make a separated try-except for called user code
207                    try:
208                        self.protocol.data_received(data)
209                    except Exception as e:
210                        error = e
211                        break
212        self.alive = False
213        self.protocol.connection_lost(error)
214        self.protocol = None
215
216    def write(self, data):
217        """Thread safe writing (uses lock)"""
218        with self._lock:
219            return self.serial.write(data)
220
221    def close(self):
222        """Close the serial port and exit reader thread (uses lock)"""
223        # use the lock to let other threads finish writing
224        with self._lock:
225            # first stop reading, so that closing can be done on idle port
226            self.stop()
227            self.serial.close()
228
229    def connect(self):
230        """
231        Wait until connection is set up and return the transport and protocol
232        instances.
233        """
234        if self.alive:
235            self._connection_made.wait()
236            if not self.alive:
237                raise RuntimeError('connection_lost already called')
238            return (self, self.protocol)
239        else:
240            raise RuntimeError('already stopped')
241
242    # - -  context manager, returns protocol
243
244    def __enter__(self):
245        """\
246        Enter context handler. May raise RuntimeError in case the connection
247        could not be created.
248        """
249        self.start()
250        self._connection_made.wait()
251        if not self.alive:
252            raise RuntimeError('connection_lost already called')
253        return self.protocol
254
255    def __exit__(self, exc_type, exc_val, exc_tb):
256        """Leave context: close port"""
257        self.close()
258
259
260# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
261# test
262if __name__ == '__main__':
263    # pylint: disable=wrong-import-position
264    import sys
265    import time
266    import traceback
267
268    #~ PORT = 'spy:///dev/ttyUSB0'
269    PORT = 'loop://'
270
271    class PrintLines(LineReader):
272        def connection_made(self, transport):
273            super(PrintLines, self).connection_made(transport)
274            sys.stdout.write('port opened\n')
275            self.write_line('hello world')
276
277        def handle_line(self, data):
278            sys.stdout.write('line received: {!r}\n'.format(data))
279
280        def connection_lost(self, exc):
281            if exc:
282                traceback.print_exc(exc)
283            sys.stdout.write('port closed\n')
284
285    ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
286    with ReaderThread(ser, PrintLines) as protocol:
287        protocol.write_line('hello')
288        time.sleep(2)
289
290    # alternative usage
291    ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
292    t = ReaderThread(ser, PrintLines)
293    t.start()
294    transport, protocol = t.connect()
295    protocol.write_line('hello')
296    time.sleep(2)
297    t.close()
298