1#!/usr/bin/env python3 2# 3# Working with threading and pySerial 4# 5# This file is part of pySerial. https://github.com/pyserial/pyserial 6# (C) 2015-2016 Chris Liechti <cliechti@gmx.net> 7# 8# SPDX-License-Identifier: BSD-3-Clause 9"""\ 10Support threading with serial ports. 11""" 12from __future__ import absolute_import 13 14import serial 15import threading 16 17 18class Protocol(object): 19 """\ 20 Protocol as used by the ReaderThread. This base class provides empty 21 implementations of all methods. 22 """ 23 24 def connection_made(self, transport): 25 """Called when reader thread is started""" 26 27 def data_received(self, data): 28 """Called with snippets received from the serial port""" 29 30 def connection_lost(self, exc): 31 """\ 32 Called when the serial port is closed or the reader loop terminated 33 otherwise. 34 """ 35 if isinstance(exc, Exception): 36 raise exc 37 38 39class Packetizer(Protocol): 40 """ 41 Read binary packets from serial port. Packets are expected to be terminated 42 with a TERMINATOR byte (null byte by default). 43 44 The class also keeps track of the transport. 45 """ 46 47 TERMINATOR = b'\0' 48 49 def __init__(self): 50 self.buffer = bytearray() 51 self.transport = None 52 53 def connection_made(self, transport): 54 """Store transport""" 55 self.transport = transport 56 57 def connection_lost(self, exc): 58 """Forget transport""" 59 self.transport = None 60 super(Packetizer, self).connection_lost(exc) 61 62 def data_received(self, data): 63 """Buffer received data, find TERMINATOR, call handle_packet""" 64 self.buffer.extend(data) 65 while self.TERMINATOR in self.buffer: 66 packet, self.buffer = self.buffer.split(self.TERMINATOR, 1) 67 self.handle_packet(packet) 68 69 def handle_packet(self, packet): 70 """Process packets - to be overridden by subclassing""" 71 raise NotImplementedError('please implement functionality in handle_packet') 72 73 74class FramedPacket(Protocol): 75 """ 76 Read binary packets. Packets are expected to have a start and stop marker. 77 78 The class also keeps track of the transport. 79 """ 80 81 START = b'(' 82 STOP = b')' 83 84 def __init__(self): 85 self.packet = bytearray() 86 self.in_packet = False 87 self.transport = None 88 89 def connection_made(self, transport): 90 """Store transport""" 91 self.transport = transport 92 93 def connection_lost(self, exc): 94 """Forget transport""" 95 self.transport = None 96 self.in_packet = False 97 del self.packet[:] 98 super(FramedPacket, self).connection_lost(exc) 99 100 def data_received(self, data): 101 """Find data enclosed in START/STOP, call handle_packet""" 102 for byte in serial.iterbytes(data): 103 if byte == self.START: 104 self.in_packet = True 105 elif byte == self.STOP: 106 self.in_packet = False 107 self.handle_packet(bytes(self.packet)) # make read-only copy 108 del self.packet[:] 109 elif self.in_packet: 110 self.packet.extend(byte) 111 else: 112 self.handle_out_of_packet_data(byte) 113 114 def handle_packet(self, packet): 115 """Process packets - to be overridden by subclassing""" 116 raise NotImplementedError('please implement functionality in handle_packet') 117 118 def handle_out_of_packet_data(self, data): 119 """Process data that is received outside of packets""" 120 pass 121 122 123class LineReader(Packetizer): 124 """ 125 Read and write (Unicode) lines from/to serial port. 126 The encoding is applied. 127 """ 128 129 TERMINATOR = b'\r\n' 130 ENCODING = 'utf-8' 131 UNICODE_HANDLING = 'replace' 132 133 def handle_packet(self, packet): 134 self.handle_line(packet.decode(self.ENCODING, self.UNICODE_HANDLING)) 135 136 def handle_line(self, line): 137 """Process one line - to be overridden by subclassing""" 138 raise NotImplementedError('please implement functionality in handle_line') 139 140 def write_line(self, text): 141 """ 142 Write text to the transport. ``text`` is a Unicode string and the encoding 143 is applied before sending ans also the newline is append. 144 """ 145 # + is not the best choice but bytes does not support % or .format in py3 and we want a single write call 146 self.transport.write(text.encode(self.ENCODING, self.UNICODE_HANDLING) + self.TERMINATOR) 147 148 149class ReaderThread(threading.Thread): 150 """\ 151 Implement a serial port read loop and dispatch to a Protocol instance (like 152 the asyncio.Protocol) but do it with threads. 153 154 Calls to close() will close the serial port but it is also possible to just 155 stop() this thread and continue the serial port instance otherwise. 156 """ 157 158 def __init__(self, serial_instance, protocol_factory): 159 """\ 160 Initialize thread. 161 162 Note that the serial_instance' timeout is set to one second! 163 Other settings are not changed. 164 """ 165 super(ReaderThread, self).__init__() 166 self.daemon = True 167 self.serial = serial_instance 168 self.protocol_factory = protocol_factory 169 self.alive = True 170 self._lock = threading.Lock() 171 self._connection_made = threading.Event() 172 self.protocol = None 173 174 def stop(self): 175 """Stop the reader thread""" 176 self.alive = False 177 if hasattr(self.serial, 'cancel_read'): 178 self.serial.cancel_read() 179 self.join(2) 180 181 def run(self): 182 """Reader loop""" 183 if not hasattr(self.serial, 'cancel_read'): 184 self.serial.timeout = 1 185 self.protocol = self.protocol_factory() 186 try: 187 self.protocol.connection_made(self) 188 except Exception as e: 189 self.alive = False 190 self.protocol.connection_lost(e) 191 self._connection_made.set() 192 return 193 error = None 194 self._connection_made.set() 195 while self.alive and self.serial.is_open: 196 try: 197 # read all that is there or wait for one byte (blocking) 198 data = self.serial.read(self.serial.in_waiting or 1) 199 except serial.SerialException as e: 200 # probably some I/O problem such as disconnected USB serial 201 # adapters -> exit 202 error = e 203 break 204 else: 205 if data: 206 # make a separated try-except for called user code 207 try: 208 self.protocol.data_received(data) 209 except Exception as e: 210 error = e 211 break 212 self.alive = False 213 self.protocol.connection_lost(error) 214 self.protocol = None 215 216 def write(self, data): 217 """Thread safe writing (uses lock)""" 218 with self._lock: 219 return self.serial.write(data) 220 221 def close(self): 222 """Close the serial port and exit reader thread (uses lock)""" 223 # use the lock to let other threads finish writing 224 with self._lock: 225 # first stop reading, so that closing can be done on idle port 226 self.stop() 227 self.serial.close() 228 229 def connect(self): 230 """ 231 Wait until connection is set up and return the transport and protocol 232 instances. 233 """ 234 if self.alive: 235 self._connection_made.wait() 236 if not self.alive: 237 raise RuntimeError('connection_lost already called') 238 return (self, self.protocol) 239 else: 240 raise RuntimeError('already stopped') 241 242 # - - context manager, returns protocol 243 244 def __enter__(self): 245 """\ 246 Enter context handler. May raise RuntimeError in case the connection 247 could not be created. 248 """ 249 self.start() 250 self._connection_made.wait() 251 if not self.alive: 252 raise RuntimeError('connection_lost already called') 253 return self.protocol 254 255 def __exit__(self, exc_type, exc_val, exc_tb): 256 """Leave context: close port""" 257 self.close() 258 259 260# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 261# test 262if __name__ == '__main__': 263 # pylint: disable=wrong-import-position 264 import sys 265 import time 266 import traceback 267 268 #~ PORT = 'spy:///dev/ttyUSB0' 269 PORT = 'loop://' 270 271 class PrintLines(LineReader): 272 def connection_made(self, transport): 273 super(PrintLines, self).connection_made(transport) 274 sys.stdout.write('port opened\n') 275 self.write_line('hello world') 276 277 def handle_line(self, data): 278 sys.stdout.write('line received: {!r}\n'.format(data)) 279 280 def connection_lost(self, exc): 281 if exc: 282 traceback.print_exc(exc) 283 sys.stdout.write('port closed\n') 284 285 ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1) 286 with ReaderThread(ser, PrintLines) as protocol: 287 protocol.write_line('hello') 288 time.sleep(2) 289 290 # alternative usage 291 ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1) 292 t = ReaderThread(ser, PrintLines) 293 t.start() 294 transport, protocol = t.connect() 295 protocol.write_line('hello') 296 time.sleep(2) 297 t.close() 298