• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#! python
2#
3# Backend for .NET/Mono (IronPython), .NET >= 2
4#
5# This file is part of pySerial. https://github.com/pyserial/pyserial
6# (C) 2008-2015 Chris Liechti <cliechti@gmx.net>
7#
8# SPDX-License-Identifier:    BSD-3-Clause
9
10from __future__ import absolute_import
11
12import System
13import System.IO.Ports
14from serial.serialutil import *
15
16# must invoke function with byte array, make a helper to convert strings
17# to byte arrays
18sab = System.Array[System.Byte]
19
20
21def as_byte_array(string):
22    return sab([ord(x) for x in string])  # XXX will require adaption when run with a 3.x compatible IronPython
23
24
25class Serial(SerialBase):
26    """Serial port implementation for .NET/Mono."""
27
28    BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
29                 9600, 19200, 38400, 57600, 115200)
30
31    def open(self):
32        """\
33        Open port with current settings. This may throw a SerialException
34        if the port cannot be opened.
35        """
36        if self._port is None:
37            raise SerialException("Port must be configured before it can be used.")
38        if self.is_open:
39            raise SerialException("Port is already open.")
40        try:
41            self._port_handle = System.IO.Ports.SerialPort(self.portstr)
42        except Exception as msg:
43            self._port_handle = None
44            raise SerialException("could not open port %s: %s" % (self.portstr, msg))
45
46        # if RTS and/or DTR are not set before open, they default to True
47        if self._rts_state is None:
48            self._rts_state = True
49        if self._dtr_state is None:
50            self._dtr_state = True
51
52        self._reconfigure_port()
53        self._port_handle.Open()
54        self.is_open = True
55        if not self._dsrdtr:
56            self._update_dtr_state()
57        if not self._rtscts:
58            self._update_rts_state()
59        self.reset_input_buffer()
60
61    def _reconfigure_port(self):
62        """Set communication parameters on opened port."""
63        if not self._port_handle:
64            raise SerialException("Can only operate on a valid port handle")
65
66        #~ self._port_handle.ReceivedBytesThreshold = 1
67
68        if self._timeout is None:
69            self._port_handle.ReadTimeout = System.IO.Ports.SerialPort.InfiniteTimeout
70        else:
71            self._port_handle.ReadTimeout = int(self._timeout * 1000)
72
73        # if self._timeout != 0 and self._interCharTimeout is not None:
74            # timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:]
75
76        if self._write_timeout is None:
77            self._port_handle.WriteTimeout = System.IO.Ports.SerialPort.InfiniteTimeout
78        else:
79            self._port_handle.WriteTimeout = int(self._write_timeout * 1000)
80
81        # Setup the connection info.
82        try:
83            self._port_handle.BaudRate = self._baudrate
84        except IOError as e:
85            # catch errors from illegal baudrate settings
86            raise ValueError(str(e))
87
88        if self._bytesize == FIVEBITS:
89            self._port_handle.DataBits = 5
90        elif self._bytesize == SIXBITS:
91            self._port_handle.DataBits = 6
92        elif self._bytesize == SEVENBITS:
93            self._port_handle.DataBits = 7
94        elif self._bytesize == EIGHTBITS:
95            self._port_handle.DataBits = 8
96        else:
97            raise ValueError("Unsupported number of data bits: %r" % self._bytesize)
98
99        if self._parity == PARITY_NONE:
100            self._port_handle.Parity = getattr(System.IO.Ports.Parity, 'None')  # reserved keyword in Py3k
101        elif self._parity == PARITY_EVEN:
102            self._port_handle.Parity = System.IO.Ports.Parity.Even
103        elif self._parity == PARITY_ODD:
104            self._port_handle.Parity = System.IO.Ports.Parity.Odd
105        elif self._parity == PARITY_MARK:
106            self._port_handle.Parity = System.IO.Ports.Parity.Mark
107        elif self._parity == PARITY_SPACE:
108            self._port_handle.Parity = System.IO.Ports.Parity.Space
109        else:
110            raise ValueError("Unsupported parity mode: %r" % self._parity)
111
112        if self._stopbits == STOPBITS_ONE:
113            self._port_handle.StopBits = System.IO.Ports.StopBits.One
114        elif self._stopbits == STOPBITS_ONE_POINT_FIVE:
115            self._port_handle.StopBits = System.IO.Ports.StopBits.OnePointFive
116        elif self._stopbits == STOPBITS_TWO:
117            self._port_handle.StopBits = System.IO.Ports.StopBits.Two
118        else:
119            raise ValueError("Unsupported number of stop bits: %r" % self._stopbits)
120
121        if self._rtscts and self._xonxoff:
122            self._port_handle.Handshake = System.IO.Ports.Handshake.RequestToSendXOnXOff
123        elif self._rtscts:
124            self._port_handle.Handshake = System.IO.Ports.Handshake.RequestToSend
125        elif self._xonxoff:
126            self._port_handle.Handshake = System.IO.Ports.Handshake.XOnXOff
127        else:
128            self._port_handle.Handshake = getattr(System.IO.Ports.Handshake, 'None')   # reserved keyword in Py3k
129
130    #~ def __del__(self):
131        #~ self.close()
132
133    def close(self):
134        """Close port"""
135        if self.is_open:
136            if self._port_handle:
137                try:
138                    self._port_handle.Close()
139                except System.IO.Ports.InvalidOperationException:
140                    # ignore errors. can happen for unplugged USB serial devices
141                    pass
142                self._port_handle = None
143            self.is_open = False
144
145    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
146
147    @property
148    def in_waiting(self):
149        """Return the number of characters currently in the input buffer."""
150        if not self.is_open:
151            raise PortNotOpenError()
152        return self._port_handle.BytesToRead
153
154    def read(self, size=1):
155        """\
156        Read size bytes from the serial port. If a timeout is set it may
157        return less characters as requested. With no timeout it will block
158        until the requested number of bytes is read.
159        """
160        if not self.is_open:
161            raise PortNotOpenError()
162        # must use single byte reads as this is the only way to read
163        # without applying encodings
164        data = bytearray()
165        while size:
166            try:
167                data.append(self._port_handle.ReadByte())
168            except System.TimeoutException:
169                break
170            else:
171                size -= 1
172        return bytes(data)
173
174    def write(self, data):
175        """Output the given string over the serial port."""
176        if not self.is_open:
177            raise PortNotOpenError()
178        #~ if not isinstance(data, (bytes, bytearray)):
179            #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
180        try:
181            # must call overloaded method with byte array argument
182            # as this is the only one not applying encodings
183            self._port_handle.Write(as_byte_array(data), 0, len(data))
184        except System.TimeoutException:
185            raise SerialTimeoutException('Write timeout')
186        return len(data)
187
188    def reset_input_buffer(self):
189        """Clear input buffer, discarding all that is in the buffer."""
190        if not self.is_open:
191            raise PortNotOpenError()
192        self._port_handle.DiscardInBuffer()
193
194    def reset_output_buffer(self):
195        """\
196        Clear output buffer, aborting the current output and
197        discarding all that is in the buffer.
198        """
199        if not self.is_open:
200            raise PortNotOpenError()
201        self._port_handle.DiscardOutBuffer()
202
203    def _update_break_state(self):
204        """
205        Set break: Controls TXD. When active, to transmitting is possible.
206        """
207        if not self.is_open:
208            raise PortNotOpenError()
209        self._port_handle.BreakState = bool(self._break_state)
210
211    def _update_rts_state(self):
212        """Set terminal status line: Request To Send"""
213        if not self.is_open:
214            raise PortNotOpenError()
215        self._port_handle.RtsEnable = bool(self._rts_state)
216
217    def _update_dtr_state(self):
218        """Set terminal status line: Data Terminal Ready"""
219        if not self.is_open:
220            raise PortNotOpenError()
221        self._port_handle.DtrEnable = bool(self._dtr_state)
222
223    @property
224    def cts(self):
225        """Read terminal status line: Clear To Send"""
226        if not self.is_open:
227            raise PortNotOpenError()
228        return self._port_handle.CtsHolding
229
230    @property
231    def dsr(self):
232        """Read terminal status line: Data Set Ready"""
233        if not self.is_open:
234            raise PortNotOpenError()
235        return self._port_handle.DsrHolding
236
237    @property
238    def ri(self):
239        """Read terminal status line: Ring Indicator"""
240        if not self.is_open:
241            raise PortNotOpenError()
242        #~ return self._port_handle.XXX
243        return False  # XXX an error would be better
244
245    @property
246    def cd(self):
247        """Read terminal status line: Carrier Detect"""
248        if not self.is_open:
249            raise PortNotOpenError()
250        return self._port_handle.CDHolding
251
252    # - - platform specific - - - -
253    # none
254