1# Copyright (c) 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Interface for SCPI Protocol. 6 7Helper module to communicate with devices that uses SCPI protocol. 8 9https://en.wikipedia.org/wiki/Standard_Commands_for_Programmable_Instruments 10 11This will be used by RF Switch that was designed to connect WiFi AP and 12WiFi Clients RF enclosures for interoperability testing. 13 14""" 15 16import logging 17import socket 18import sys 19 20 21class ScpiException(Exception): 22 """Exception for SCPI Errors.""" 23 24 def __init__(self, msg=None, cause=None): 25 messages = [] 26 if msg: 27 messages.append(msg) 28 if cause: 29 messages.append('Wrapping exception: %s: %s' % ( 30 type(cause).__name__, str(cause))) 31 super(ScpiException, self).__init__(', '.join(messages)) 32 33 34class Scpi(object): 35 """Controller for devices using SCPI protocol.""" 36 37 SCPI_PORT = 5025 38 DEFAULT_READ_LEN = 4096 39 40 CMD_IDENTITY = '*IDN?' 41 CMD_RESET = '*RST' 42 CMD_STATUS = '*STB?' 43 CMD_ERROR_CHECK = 'SYST:ERR?' 44 45 def __init__(self, host, port=SCPI_PORT): 46 """ 47 Controller for devices using SCPI protocol. 48 49 @param host: hostname or IP address of device using SCPI protocol 50 @param port: Int SCPI port number (default 5025) 51 52 @raises SCPIException: on error connecting to device 53 54 """ 55 self.host = host 56 self.port = port 57 58 # Open a socket connection for communication with chassis. 59 try: 60 self.socket = socket.socket() 61 self.socket.connect((host, port)) 62 except (socket.error, socket.timeout) as e: 63 logging.error('Error connecting to SCPI device.') 64 raise ScpiException(cause=e), None, sys.exc_info()[2] 65 66 def close(self): 67 """Close the connection.""" 68 if hasattr(self, 'socket'): 69 self.socket.close() 70 del self.socket 71 72 def write(self, data): 73 """Send data to socket. 74 75 @param data: Data to send 76 77 @returns number of bytes sent 78 79 """ 80 return self.socket.send(data) 81 82 def read(self, buffer_size=DEFAULT_READ_LEN): 83 """Safely read the query response. 84 85 @param buffer_size: Int max data to read at once (default 4096) 86 87 @returns String data read from the socket 88 89 """ 90 return str(self.socket.recv(buffer_size)) 91 92 def query(self, data, buffer_size=DEFAULT_READ_LEN): 93 """Send the query and get response. 94 95 @param data: data (Query) to send 96 @param buffer_size: Int max data to read at once (default 4096) 97 98 @returns String data read from the socket 99 100 """ 101 self.write(data) 102 return self.read(buffer_size) 103 104 def info(self): 105 """Get Chassis Info. 106 107 @returns dictionary information of Chassis 108 109 """ 110 # Returns a comma separated text as below converted to dict. 111 # 'VTI Instruments Corporation,EX7200-S-11539,138454,3.13.8\n' 112 return dict( 113 zip(('Manufacturer', 'Model', 'Serial', 'Version'), 114 self.query('%s\n' % self.CMD_IDENTITY) 115 .strip().split(',', 3))) 116 117 def reset(self): 118 """Reset the chassis. 119 120 @returns number of bytes sent 121 """ 122 return self.write('%s\n' % self.CMD_RESET) 123 124 def status(self): 125 """Get status of relays. 126 127 @returns Int status of relays 128 129 """ 130 return int(self.query('%s\n' % self.CMD_STATUS)) 131 132 def error_query(self): 133 """Check for any error. 134 135 @returns tuple of error code and error message 136 137 """ 138 code, msg = self.query('%s\n' % self.CMD_ERROR_CHECK).split(', ') 139 return int(code), msg.strip().strip('"') 140