1#! python 2# Python Serial Port Extension for Win32, Linux, BSD, Jython and .NET/Mono 3# serial driver for .NET/Mono (IronPython), .NET >= 2 4# see __init__.py 5# 6# (C) 2008 Chris Liechti <cliechti@gmx.net> 7# this is distributed under a free software license, see license.txt 8 9import clr 10import System 11import System.IO.Ports 12from serial.serialutil import * 13 14 15def device(portnum): 16 """Turn a port number into a device name""" 17 return System.IO.Ports.SerialPort.GetPortNames()[portnum] 18 19 20# must invoke function with byte array, make a helper to convert strings 21# to byte arrays 22sab = System.Array[System.Byte] 23def as_byte_array(string): 24 return sab([ord(x) for x in string]) # XXX will require adaption when run with a 3.x compatible IronPython 25 26class IronSerial(SerialBase): 27 """Serial port implementation for .NET/Mono.""" 28 29 BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, 30 9600, 19200, 38400, 57600, 115200) 31 32 def open(self): 33 """Open port with current settings. This may throw a SerialException 34 if the port cannot be opened.""" 35 if self._port is None: 36 raise SerialException("Port must be configured before it can be used.") 37 if self._isOpen: 38 raise SerialException("Port is already open.") 39 try: 40 self._port_handle = System.IO.Ports.SerialPort(self.portstr) 41 except Exception, msg: 42 self._port_handle = None 43 raise SerialException("could not open port %s: %s" % (self.portstr, msg)) 44 45 self._reconfigurePort() 46 self._port_handle.Open() 47 self._isOpen = True 48 if not self._rtscts: 49 self.setRTS(True) 50 self.setDTR(True) 51 self.flushInput() 52 self.flushOutput() 53 54 def _reconfigurePort(self): 55 """Set communication parameters on opened port.""" 56 if not self._port_handle: 57 raise SerialException("Can only operate on a valid port handle") 58 59 #~ self._port_handle.ReceivedBytesThreshold = 1 60 61 if self._timeout is None: 62 self._port_handle.ReadTimeout = System.IO.Ports.SerialPort.InfiniteTimeout 63 else: 64 self._port_handle.ReadTimeout = int(self._timeout*1000) 65 66 # if self._timeout != 0 and self._interCharTimeout is not None: 67 # timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:] 68 69 if self._writeTimeout is None: 70 self._port_handle.WriteTimeout = System.IO.Ports.SerialPort.InfiniteTimeout 71 else: 72 self._port_handle.WriteTimeout = int(self._writeTimeout*1000) 73 74 75 # Setup the connection info. 76 try: 77 self._port_handle.BaudRate = self._baudrate 78 except IOError, e: 79 # catch errors from illegal baudrate settings 80 raise ValueError(str(e)) 81 82 if self._bytesize == FIVEBITS: 83 self._port_handle.DataBits = 5 84 elif self._bytesize == SIXBITS: 85 self._port_handle.DataBits = 6 86 elif self._bytesize == SEVENBITS: 87 self._port_handle.DataBits = 7 88 elif self._bytesize == EIGHTBITS: 89 self._port_handle.DataBits = 8 90 else: 91 raise ValueError("Unsupported number of data bits: %r" % self._bytesize) 92 93 if self._parity == PARITY_NONE: 94 self._port_handle.Parity = getattr(System.IO.Ports.Parity, 'None') # reserved keyword in Py3k 95 elif self._parity == PARITY_EVEN: 96 self._port_handle.Parity = System.IO.Ports.Parity.Even 97 elif self._parity == PARITY_ODD: 98 self._port_handle.Parity = System.IO.Ports.Parity.Odd 99 elif self._parity == PARITY_MARK: 100 self._port_handle.Parity = System.IO.Ports.Parity.Mark 101 elif self._parity == PARITY_SPACE: 102 self._port_handle.Parity = System.IO.Ports.Parity.Space 103 else: 104 raise ValueError("Unsupported parity mode: %r" % self._parity) 105 106 if self._stopbits == STOPBITS_ONE: 107 self._port_handle.StopBits = System.IO.Ports.StopBits.One 108 elif self._stopbits == STOPBITS_ONE_POINT_FIVE: 109 self._port_handle.StopBits = System.IO.Ports.StopBits.OnePointFive 110 elif self._stopbits == STOPBITS_TWO: 111 self._port_handle.StopBits = System.IO.Ports.StopBits.Two 112 else: 113 raise ValueError("Unsupported number of stop bits: %r" % self._stopbits) 114 115 if self._rtscts and self._xonxoff: 116 self._port_handle.Handshake = System.IO.Ports.Handshake.RequestToSendXOnXOff 117 elif self._rtscts: 118 self._port_handle.Handshake = System.IO.Ports.Handshake.RequestToSend 119 elif self._xonxoff: 120 self._port_handle.Handshake = System.IO.Ports.Handshake.XOnXOff 121 else: 122 self._port_handle.Handshake = getattr(System.IO.Ports.Handshake, 'None') # reserved keyword in Py3k 123 124 #~ def __del__(self): 125 #~ self.close() 126 127 def close(self): 128 """Close port""" 129 if self._isOpen: 130 if self._port_handle: 131 try: 132 self._port_handle.Close() 133 except System.IO.Ports.InvalidOperationException: 134 # ignore errors. can happen for unplugged USB serial devices 135 pass 136 self._port_handle = None 137 self._isOpen = False 138 139 def makeDeviceName(self, port): 140 try: 141 return device(port) 142 except TypeError, e: 143 raise SerialException(str(e)) 144 145 # - - - - - - - - - - - - - - - - - - - - - - - - 146 147 def inWaiting(self): 148 """Return the number of characters currently in the input buffer.""" 149 if not self._port_handle: raise portNotOpenError 150 return self._port_handle.BytesToRead 151 152 def read(self, size=1): 153 """Read size bytes from the serial port. If a timeout is set it may 154 return less characters as requested. With no timeout it will block 155 until the requested number of bytes is read.""" 156 if not self._port_handle: raise portNotOpenError 157 # must use single byte reads as this is the only way to read 158 # without applying encodings 159 data = bytearray() 160 while size: 161 try: 162 data.append(self._port_handle.ReadByte()) 163 except System.TimeoutException, e: 164 break 165 else: 166 size -= 1 167 return bytes(data) 168 169 def write(self, data): 170 """Output the given string over the serial port.""" 171 if not self._port_handle: raise portNotOpenError 172 if not isinstance(data, (bytes, bytearray)): 173 raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data))) 174 try: 175 # must call overloaded method with byte array argument 176 # as this is the only one not applying encodings 177 self._port_handle.Write(as_byte_array(data), 0, len(data)) 178 except System.TimeoutException, e: 179 raise writeTimeoutError 180 return len(data) 181 182 def flushInput(self): 183 """Clear input buffer, discarding all that is in the buffer.""" 184 if not self._port_handle: raise portNotOpenError 185 self._port_handle.DiscardInBuffer() 186 187 def flushOutput(self): 188 """Clear output buffer, aborting the current output and 189 discarding all that is in the buffer.""" 190 if not self._port_handle: raise portNotOpenError 191 self._port_handle.DiscardOutBuffer() 192 193 def sendBreak(self, duration=0.25): 194 """Send break condition. Timed, returns to idle state after given duration.""" 195 if not self._port_handle: raise portNotOpenError 196 import time 197 self._port_handle.BreakState = True 198 time.sleep(duration) 199 self._port_handle.BreakState = False 200 201 def setBreak(self, level=True): 202 """Set break: Controls TXD. When active, to transmitting is possible.""" 203 if not self._port_handle: raise portNotOpenError 204 self._port_handle.BreakState = bool(level) 205 206 def setRTS(self, level=True): 207 """Set terminal status line: Request To Send""" 208 if not self._port_handle: raise portNotOpenError 209 self._port_handle.RtsEnable = bool(level) 210 211 def setDTR(self, level=True): 212 """Set terminal status line: Data Terminal Ready""" 213 if not self._port_handle: raise portNotOpenError 214 self._port_handle.DtrEnable = bool(level) 215 216 def getCTS(self): 217 """Read terminal status line: Clear To Send""" 218 if not self._port_handle: raise portNotOpenError 219 return self._port_handle.CtsHolding 220 221 def getDSR(self): 222 """Read terminal status line: Data Set Ready""" 223 if not self._port_handle: raise portNotOpenError 224 return self._port_handle.DsrHolding 225 226 def getRI(self): 227 """Read terminal status line: Ring Indicator""" 228 if not self._port_handle: raise portNotOpenError 229 #~ return self._port_handle.XXX 230 return False #XXX an error would be better 231 232 def getCD(self): 233 """Read terminal status line: Carrier Detect""" 234 if not self._port_handle: raise portNotOpenError 235 return self._port_handle.CDHolding 236 237 # - - platform specific - - - - 238 # none 239 240 241# assemble Serial class with the platform specific implementation and the base 242# for file-like behavior. for Python 2.6 and newer, that provide the new I/O 243# library, derive from io.RawIOBase 244try: 245 import io 246except ImportError: 247 # classic version with our own file-like emulation 248 class Serial(IronSerial, FileLike): 249 pass 250else: 251 # io library present 252 class Serial(IronSerial, io.RawIOBase): 253 pass 254 255 256# Nur Testfunktion!! 257if __name__ == '__main__': 258 import sys 259 260 s = Serial(0) 261 sys.stdio.write('%s\n' % s) 262 263 s = Serial() 264 sys.stdio.write('%s\n' % s) 265 266 267 s.baudrate = 19200 268 s.databits = 7 269 s.close() 270 s.port = 0 271 s.open() 272 sys.stdio.write('%s\n' % s) 273 274