• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#! python
2#
3# This module implements a loop back connection receiving itself what it sent.
4#
5# The purpose of this module is.. well... You can run the unit tests with it.
6# and it was so easy to implement ;-)
7#
8# This file is part of pySerial. https://github.com/pyserial/pyserial
9# (C) 2001-2020 Chris Liechti <cliechti@gmx.net>
10#
11# SPDX-License-Identifier:    BSD-3-Clause
12#
13# URL format:    loop://[option[/option...]]
14# options:
15# - "debug" print diagnostic messages
16from __future__ import absolute_import
17
18import logging
19import numbers
20import time
21try:
22    import urlparse
23except ImportError:
24    import urllib.parse as urlparse
25try:
26    import queue
27except ImportError:
28    import Queue as queue
29
30from serial.serialutil import SerialBase, SerialException, to_bytes, iterbytes, SerialTimeoutException, PortNotOpenError
31
32# map log level names to constants. used in from_url()
33LOGGER_LEVELS = {
34    'debug': logging.DEBUG,
35    'info': logging.INFO,
36    'warning': logging.WARNING,
37    'error': logging.ERROR,
38}
39
40
41class Serial(SerialBase):
42    """Serial port implementation that simulates a loop back connection in plain software."""
43
44    BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
45                 9600, 19200, 38400, 57600, 115200)
46
47    def __init__(self, *args, **kwargs):
48        self.buffer_size = 4096
49        self.queue = None
50        self.logger = None
51        self._cancel_write = False
52        super(Serial, self).__init__(*args, **kwargs)
53
54    def open(self):
55        """\
56        Open port with current settings. This may throw a SerialException
57        if the port cannot be opened.
58        """
59        if self.is_open:
60            raise SerialException("Port is already open.")
61        self.logger = None
62        self.queue = queue.Queue(self.buffer_size)
63
64        if self._port is None:
65            raise SerialException("Port must be configured before it can be used.")
66        # not that there is anything to open, but the function applies the
67        # options found in the URL
68        self.from_url(self.port)
69
70        # not that there anything to configure...
71        self._reconfigure_port()
72        # all things set up get, now a clean start
73        self.is_open = True
74        if not self._dsrdtr:
75            self._update_dtr_state()
76        if not self._rtscts:
77            self._update_rts_state()
78        self.reset_input_buffer()
79        self.reset_output_buffer()
80
81    def close(self):
82        if self.is_open:
83            self.is_open = False
84            try:
85                self.queue.put_nowait(None)
86            except queue.Full:
87                pass
88        super(Serial, self).close()
89
90    def _reconfigure_port(self):
91        """\
92        Set communication parameters on opened port. For the loop://
93        protocol all settings are ignored!
94        """
95        # not that's it of any real use, but it helps in the unit tests
96        if not isinstance(self._baudrate, numbers.Integral) or not 0 < self._baudrate < 2 ** 32:
97            raise ValueError("invalid baudrate: {!r}".format(self._baudrate))
98        if self.logger:
99            self.logger.info('_reconfigure_port()')
100
101    def from_url(self, url):
102        """extract host and port from an URL string"""
103        parts = urlparse.urlsplit(url)
104        if parts.scheme != "loop":
105            raise SerialException(
106                'expected a string in the form '
107                '"loop://[?logging={debug|info|warning|error}]": not starting '
108                'with loop:// ({!r})'.format(parts.scheme))
109        try:
110            # process options now, directly altering self
111            for option, values in urlparse.parse_qs(parts.query, True).items():
112                if option == 'logging':
113                    logging.basicConfig()   # XXX is that good to call it here?
114                    self.logger = logging.getLogger('pySerial.loop')
115                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
116                    self.logger.debug('enabled logging')
117                else:
118                    raise ValueError('unknown option: {!r}'.format(option))
119        except ValueError as e:
120            raise SerialException(
121                'expected a string in the form '
122                '"loop://[?logging={debug|info|warning|error}]": {}'.format(e))
123
124    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
125
126    @property
127    def in_waiting(self):
128        """Return the number of bytes currently in the input buffer."""
129        if not self.is_open:
130            raise PortNotOpenError()
131        if self.logger:
132            # attention the logged value can differ from return value in
133            # threaded environments...
134            self.logger.debug('in_waiting -> {:d}'.format(self.queue.qsize()))
135        return self.queue.qsize()
136
137    def read(self, size=1):
138        """\
139        Read size bytes from the serial port. If a timeout is set it may
140        return less characters as requested. With no timeout it will block
141        until the requested number of bytes is read.
142        """
143        if not self.is_open:
144            raise PortNotOpenError()
145        if self._timeout is not None and self._timeout != 0:
146            timeout = time.time() + self._timeout
147        else:
148            timeout = None
149        data = bytearray()
150        while size > 0 and self.is_open:
151            try:
152                b = self.queue.get(timeout=self._timeout)  # XXX inter char timeout
153            except queue.Empty:
154                if self._timeout == 0:
155                    break
156            else:
157                if b is not None:
158                    data += b
159                    size -= 1
160                else:
161                    break
162            # check for timeout now, after data has been read.
163            # useful for timeout = 0 (non blocking) read
164            if timeout and time.time() > timeout:
165                if self.logger:
166                    self.logger.info('read timeout')
167                break
168        return bytes(data)
169
170    def cancel_read(self):
171        self.queue.put_nowait(None)
172
173    def cancel_write(self):
174        self._cancel_write = True
175
176    def write(self, data):
177        """\
178        Output the given byte string over the serial port. Can block if the
179        connection is blocked. May raise SerialException if the connection is
180        closed.
181        """
182        self._cancel_write = False
183        if not self.is_open:
184            raise PortNotOpenError()
185        data = to_bytes(data)
186        # calculate aprox time that would be used to send the data
187        time_used_to_send = 10.0 * len(data) / self._baudrate
188        # when a write timeout is configured check if we would be successful
189        # (not sending anything, not even the part that would have time)
190        if self._write_timeout is not None and time_used_to_send > self._write_timeout:
191            # must wait so that unit test succeeds
192            time_left = self._write_timeout
193            while time_left > 0 and not self._cancel_write:
194                time.sleep(min(time_left, 0.5))
195                time_left -= 0.5
196            if self._cancel_write:
197                return 0  # XXX
198            raise SerialTimeoutException('Write timeout')
199        for byte in iterbytes(data):
200            self.queue.put(byte, timeout=self._write_timeout)
201        return len(data)
202
203    def reset_input_buffer(self):
204        """Clear input buffer, discarding all that is in the buffer."""
205        if not self.is_open:
206            raise PortNotOpenError()
207        if self.logger:
208            self.logger.info('reset_input_buffer()')
209        try:
210            while self.queue.qsize():
211                self.queue.get_nowait()
212        except queue.Empty:
213            pass
214
215    def reset_output_buffer(self):
216        """\
217        Clear output buffer, aborting the current output and
218        discarding all that is in the buffer.
219        """
220        if not self.is_open:
221            raise PortNotOpenError()
222        if self.logger:
223            self.logger.info('reset_output_buffer()')
224        try:
225            while self.queue.qsize():
226                self.queue.get_nowait()
227        except queue.Empty:
228            pass
229
230    @property
231    def out_waiting(self):
232        """Return how many bytes the in the outgoing buffer"""
233        if not self.is_open:
234            raise PortNotOpenError()
235        if self.logger:
236            # attention the logged value can differ from return value in
237            # threaded environments...
238            self.logger.debug('out_waiting -> {:d}'.format(self.queue.qsize()))
239        return self.queue.qsize()
240
241    def _update_break_state(self):
242        """\
243        Set break: Controls TXD. When active, to transmitting is
244        possible.
245        """
246        if self.logger:
247            self.logger.info('_update_break_state({!r})'.format(self._break_state))
248
249    def _update_rts_state(self):
250        """Set terminal status line: Request To Send"""
251        if self.logger:
252            self.logger.info('_update_rts_state({!r}) -> state of CTS'.format(self._rts_state))
253
254    def _update_dtr_state(self):
255        """Set terminal status line: Data Terminal Ready"""
256        if self.logger:
257            self.logger.info('_update_dtr_state({!r}) -> state of DSR'.format(self._dtr_state))
258
259    @property
260    def cts(self):
261        """Read terminal status line: Clear To Send"""
262        if not self.is_open:
263            raise PortNotOpenError()
264        if self.logger:
265            self.logger.info('CTS -> state of RTS ({!r})'.format(self._rts_state))
266        return self._rts_state
267
268    @property
269    def dsr(self):
270        """Read terminal status line: Data Set Ready"""
271        if self.logger:
272            self.logger.info('DSR -> state of DTR ({!r})'.format(self._dtr_state))
273        return self._dtr_state
274
275    @property
276    def ri(self):
277        """Read terminal status line: Ring Indicator"""
278        if not self.is_open:
279            raise PortNotOpenError()
280        if self.logger:
281            self.logger.info('returning dummy for RI')
282        return False
283
284    @property
285    def cd(self):
286        """Read terminal status line: Carrier Detect"""
287        if not self.is_open:
288            raise PortNotOpenError()
289        if self.logger:
290            self.logger.info('returning dummy for CD')
291        return True
292
293    # - - - platform specific - - -
294    # None so far
295
296
297# simple client test
298if __name__ == '__main__':
299    import sys
300    s = Serial('loop://')
301    sys.stdout.write('{}\n'.format(s))
302
303    sys.stdout.write("write...\n")
304    s.write("hello\n")
305    s.flush()
306    sys.stdout.write("read: {!r}\n".format(s.read(5)))
307
308    s.close()
309