1#! python 2# 3# This module implements a loop back connection receiving itself what it sent. 4# 5# The purpose of this module is.. well... You can run the unit tests with it. 6# and it was so easy to implement ;-) 7# 8# This file is part of pySerial. https://github.com/pyserial/pyserial 9# (C) 2001-2020 Chris Liechti <cliechti@gmx.net> 10# 11# SPDX-License-Identifier: BSD-3-Clause 12# 13# URL format: loop://[option[/option...]] 14# options: 15# - "debug" print diagnostic messages 16from __future__ import absolute_import 17 18import logging 19import numbers 20import time 21try: 22 import urlparse 23except ImportError: 24 import urllib.parse as urlparse 25try: 26 import queue 27except ImportError: 28 import Queue as queue 29 30from serial.serialutil import SerialBase, SerialException, to_bytes, iterbytes, SerialTimeoutException, PortNotOpenError 31 32# map log level names to constants. used in from_url() 33LOGGER_LEVELS = { 34 'debug': logging.DEBUG, 35 'info': logging.INFO, 36 'warning': logging.WARNING, 37 'error': logging.ERROR, 38} 39 40 41class Serial(SerialBase): 42 """Serial port implementation that simulates a loop back connection in plain software.""" 43 44 BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, 45 9600, 19200, 38400, 57600, 115200) 46 47 def __init__(self, *args, **kwargs): 48 self.buffer_size = 4096 49 self.queue = None 50 self.logger = None 51 self._cancel_write = False 52 super(Serial, self).__init__(*args, **kwargs) 53 54 def open(self): 55 """\ 56 Open port with current settings. This may throw a SerialException 57 if the port cannot be opened. 58 """ 59 if self.is_open: 60 raise SerialException("Port is already open.") 61 self.logger = None 62 self.queue = queue.Queue(self.buffer_size) 63 64 if self._port is None: 65 raise SerialException("Port must be configured before it can be used.") 66 # not that there is anything to open, but the function applies the 67 # options found in the URL 68 self.from_url(self.port) 69 70 # not that there anything to configure... 71 self._reconfigure_port() 72 # all things set up get, now a clean start 73 self.is_open = True 74 if not self._dsrdtr: 75 self._update_dtr_state() 76 if not self._rtscts: 77 self._update_rts_state() 78 self.reset_input_buffer() 79 self.reset_output_buffer() 80 81 def close(self): 82 if self.is_open: 83 self.is_open = False 84 try: 85 self.queue.put_nowait(None) 86 except queue.Full: 87 pass 88 super(Serial, self).close() 89 90 def _reconfigure_port(self): 91 """\ 92 Set communication parameters on opened port. For the loop:// 93 protocol all settings are ignored! 94 """ 95 # not that's it of any real use, but it helps in the unit tests 96 if not isinstance(self._baudrate, numbers.Integral) or not 0 < self._baudrate < 2 ** 32: 97 raise ValueError("invalid baudrate: {!r}".format(self._baudrate)) 98 if self.logger: 99 self.logger.info('_reconfigure_port()') 100 101 def from_url(self, url): 102 """extract host and port from an URL string""" 103 parts = urlparse.urlsplit(url) 104 if parts.scheme != "loop": 105 raise SerialException( 106 'expected a string in the form ' 107 '"loop://[?logging={debug|info|warning|error}]": not starting ' 108 'with loop:// ({!r})'.format(parts.scheme)) 109 try: 110 # process options now, directly altering self 111 for option, values in urlparse.parse_qs(parts.query, True).items(): 112 if option == 'logging': 113 logging.basicConfig() # XXX is that good to call it here? 114 self.logger = logging.getLogger('pySerial.loop') 115 self.logger.setLevel(LOGGER_LEVELS[values[0]]) 116 self.logger.debug('enabled logging') 117 else: 118 raise ValueError('unknown option: {!r}'.format(option)) 119 except ValueError as e: 120 raise SerialException( 121 'expected a string in the form ' 122 '"loop://[?logging={debug|info|warning|error}]": {}'.format(e)) 123 124 # - - - - - - - - - - - - - - - - - - - - - - - - 125 126 @property 127 def in_waiting(self): 128 """Return the number of bytes currently in the input buffer.""" 129 if not self.is_open: 130 raise PortNotOpenError() 131 if self.logger: 132 # attention the logged value can differ from return value in 133 # threaded environments... 134 self.logger.debug('in_waiting -> {:d}'.format(self.queue.qsize())) 135 return self.queue.qsize() 136 137 def read(self, size=1): 138 """\ 139 Read size bytes from the serial port. If a timeout is set it may 140 return less characters as requested. With no timeout it will block 141 until the requested number of bytes is read. 142 """ 143 if not self.is_open: 144 raise PortNotOpenError() 145 if self._timeout is not None and self._timeout != 0: 146 timeout = time.time() + self._timeout 147 else: 148 timeout = None 149 data = bytearray() 150 while size > 0 and self.is_open: 151 try: 152 b = self.queue.get(timeout=self._timeout) # XXX inter char timeout 153 except queue.Empty: 154 if self._timeout == 0: 155 break 156 else: 157 if b is not None: 158 data += b 159 size -= 1 160 else: 161 break 162 # check for timeout now, after data has been read. 163 # useful for timeout = 0 (non blocking) read 164 if timeout and time.time() > timeout: 165 if self.logger: 166 self.logger.info('read timeout') 167 break 168 return bytes(data) 169 170 def cancel_read(self): 171 self.queue.put_nowait(None) 172 173 def cancel_write(self): 174 self._cancel_write = True 175 176 def write(self, data): 177 """\ 178 Output the given byte string over the serial port. Can block if the 179 connection is blocked. May raise SerialException if the connection is 180 closed. 181 """ 182 self._cancel_write = False 183 if not self.is_open: 184 raise PortNotOpenError() 185 data = to_bytes(data) 186 # calculate aprox time that would be used to send the data 187 time_used_to_send = 10.0 * len(data) / self._baudrate 188 # when a write timeout is configured check if we would be successful 189 # (not sending anything, not even the part that would have time) 190 if self._write_timeout is not None and time_used_to_send > self._write_timeout: 191 # must wait so that unit test succeeds 192 time_left = self._write_timeout 193 while time_left > 0 and not self._cancel_write: 194 time.sleep(min(time_left, 0.5)) 195 time_left -= 0.5 196 if self._cancel_write: 197 return 0 # XXX 198 raise SerialTimeoutException('Write timeout') 199 for byte in iterbytes(data): 200 self.queue.put(byte, timeout=self._write_timeout) 201 return len(data) 202 203 def reset_input_buffer(self): 204 """Clear input buffer, discarding all that is in the buffer.""" 205 if not self.is_open: 206 raise PortNotOpenError() 207 if self.logger: 208 self.logger.info('reset_input_buffer()') 209 try: 210 while self.queue.qsize(): 211 self.queue.get_nowait() 212 except queue.Empty: 213 pass 214 215 def reset_output_buffer(self): 216 """\ 217 Clear output buffer, aborting the current output and 218 discarding all that is in the buffer. 219 """ 220 if not self.is_open: 221 raise PortNotOpenError() 222 if self.logger: 223 self.logger.info('reset_output_buffer()') 224 try: 225 while self.queue.qsize(): 226 self.queue.get_nowait() 227 except queue.Empty: 228 pass 229 230 @property 231 def out_waiting(self): 232 """Return how many bytes the in the outgoing buffer""" 233 if not self.is_open: 234 raise PortNotOpenError() 235 if self.logger: 236 # attention the logged value can differ from return value in 237 # threaded environments... 238 self.logger.debug('out_waiting -> {:d}'.format(self.queue.qsize())) 239 return self.queue.qsize() 240 241 def _update_break_state(self): 242 """\ 243 Set break: Controls TXD. When active, to transmitting is 244 possible. 245 """ 246 if self.logger: 247 self.logger.info('_update_break_state({!r})'.format(self._break_state)) 248 249 def _update_rts_state(self): 250 """Set terminal status line: Request To Send""" 251 if self.logger: 252 self.logger.info('_update_rts_state({!r}) -> state of CTS'.format(self._rts_state)) 253 254 def _update_dtr_state(self): 255 """Set terminal status line: Data Terminal Ready""" 256 if self.logger: 257 self.logger.info('_update_dtr_state({!r}) -> state of DSR'.format(self._dtr_state)) 258 259 @property 260 def cts(self): 261 """Read terminal status line: Clear To Send""" 262 if not self.is_open: 263 raise PortNotOpenError() 264 if self.logger: 265 self.logger.info('CTS -> state of RTS ({!r})'.format(self._rts_state)) 266 return self._rts_state 267 268 @property 269 def dsr(self): 270 """Read terminal status line: Data Set Ready""" 271 if self.logger: 272 self.logger.info('DSR -> state of DTR ({!r})'.format(self._dtr_state)) 273 return self._dtr_state 274 275 @property 276 def ri(self): 277 """Read terminal status line: Ring Indicator""" 278 if not self.is_open: 279 raise PortNotOpenError() 280 if self.logger: 281 self.logger.info('returning dummy for RI') 282 return False 283 284 @property 285 def cd(self): 286 """Read terminal status line: Carrier Detect""" 287 if not self.is_open: 288 raise PortNotOpenError() 289 if self.logger: 290 self.logger.info('returning dummy for CD') 291 return True 292 293 # - - - platform specific - - - 294 # None so far 295 296 297# simple client test 298if __name__ == '__main__': 299 import sys 300 s = Serial('loop://') 301 sys.stdout.write('{}\n'.format(s)) 302 303 sys.stdout.write("write...\n") 304 s.write("hello\n") 305 s.flush() 306 sys.stdout.write("read: {!r}\n".format(s.read(5))) 307 308 s.close() 309