1import sys, time 2import select 3from socket import socket 4from util.log_info import logger 5 6class SocketCon(socket): 7 def __init__(self, ip, port, timeout=10, encoding="GBK"): 8 ''' 9 #=================================================================================== 10 # @Method: __init__(self, ip, port,timeout=20,encoding="GBK") 11 # @Func: 初始化方法,创建telnet连接 12 # @Param: ip: ip地址 13 # port:端口号 14 # timeout:超时时间 15 # encoding:字符串和字节转换的格式 16 # @eg: __init__("127.0.0.1", 7788,10,utf-8) 17 # @return: none 18 #=================================================================================== 19 ''' 20 super().__init__() 21 try: 22 self.encoding = encoding 23 self.settimeout(timeout) 24 self.connect((ip, port)) 25 except Exception as e: 26 if isinstance(e, ConnectionRefusedError): 27 logger.error("ConnectionRefusedError") 28 else: 29 logger.error(e) 30 31 def closeCon(self): 32 ''' 33 #=================================================================================== 34 # @Method: __init__(self, ip, port,timeout=20,encoding="GBK") 35 # @Func: 关闭连接 36 # @Param: 无 37 # @eg: closeCon() 38 # @return: none 39 #=================================================================================== 40 ''' 41 self.close() 42 43 def send(self, msg): 44 ''' 45 #=================================================================================== 46 # @Method: send(self,msg) 47 # @Func: 发送消息给box或串口服务器 48 # @Param: msg 49 # @eg: send("XDEV USB SIGNal 0 ON") 50 # @return: Bool 51 #=================================================================================== 52 ''' 53 try: 54 self.sendall(msg.encode(self.encoding)) 55 return True 56 except Exception as e: 57 logger.error(e) 58 return False 59 60 def login(self, user="Administrator", password=""): 61 ''' 62 #=================================================================================== 63 # @Method: login(self,user,password) 64 # @Func: 串口服务器认证 65 # @Param: user:用户名 66 # password:密码 67 # @eg: login("Administrator","") 68 # @return: Bool 69 #=================================================================================== 70 ''' 71 msg = user + "/" + password 72 73 if self.send(msg): 74 recv = self.receive() 75 if recv == "认证错误!": 76 return False 77 elif recv == "Error": 78 return False 79 else: 80 return True 81 else: 82 return False 83 84 def receive(self, size=16384): 85 ''' 86 #=================================================================================== 87 # @Method: receive(self,user,size) 88 # @Func: 接收远程主机或串口服务器返回的消息 89 # @Param: size: 90 # @eg: receive(16384) 91 # @return: str 92 #=================================================================================== 93 ''' 94 95 try: 96 ready = select.select([self], [], [], 2) 97 if ready[0]: 98 ret = self.recv(size) 99 if isinstance(ret, bytes): 100 recv = ret.decode(self.encoding) 101 return recv 102 return "" 103 except Exception as e: 104 logger.error(e) 105 return "Error" 106 107 108def serialPowerOnOff(ip, port, index, power, user='Administrator', password=''): 109 ''' 110 #=================================================================================== 111 # @Method: serialPowerOnOff(ip, port,index,power,user='Administrator', password='') 112 # @Func: 上下电执行方法 113 # @Param: ip: 串口服务器ip 114 # port:上下电命令端口 115 # index:1-48,电源端口序号 116 # power:on/off,上电/下电操作 117 # user:串口服务器的用户名,默认取Administrator 118 # password:串口服务器的密码,默认取空 119 # @eg: serialPowerOnOff("10.176.49.48", 18984,1,"on") 120 # @return: none 121 #=================================================================================== 122 ''' 123 try: 124 serialPort = SocketCon(ip, int(port)) 125 126 # 串口服务器认证 127 if not serialPort.login(user=user, password=password): 128 logger.error("认证错误") 129 return False 130 131 if power == "on": 132 # 串口上电指令 133 sendMsg = "ctrlpower." + str(index) + "=On\n" 134 # 上电成功预期返回消息 135 recvMsg = "ctrlpower." + str(index) + "=1\n" 136 elif power == "off": 137 # 串口下电指令 138 sendMsg = "ctrlpower." + str(index) + "=Off\n" 139 # 下电成功预期返回消息 140 recvMsg = "ctrlpower." + str(index) + "=0\n" 141 else: 142 logger.error("请给on/off指令") 143 return False 144 145 if serialPort.send(sendMsg): 146 recv = serialPort.receive() 147 148 # 返回消息与预期相同,认为操作成功 149 if recv == recvMsg: 150 logger.info("操作成功") 151 return True 152 else: 153 logger.error("操作失败") 154 return False 155 else: 156 logger.error("操作失败") 157 return False 158 except Exception as e: 159 logger.error(e) 160 return False 161 finally: 162 serialPort.closeCon() 163 164def usbPowerOnOff(ip, port, index, power): 165 ''' 166 #=================================================================================== 167 # @Method: usbPowerOnOff(ip, port) 168 # @Func: GT3000上下电执行方法 169 # @Param: ip: box IP 170 # port:GT3000端口 171 # index:1-8,usb槽位序号 172 # power:on/off,上电/下电操作 173 # @eg: usbPowerOnOff("127.0.0.1", 7788,"on") 174 # @return: none 175 #=================================================================================== 176 ''' 177 178 usbpower = SocketCon(ip, int(port)) 179 180 try: 181 if power == "on": 182 sendMsg = "XDEV USB POWEr " + str(index) + " ON\n" 183 elif power == "off": 184 sendMsg = "XDEV USB POWEr " + str(index) + " OFF\n" 185 else: 186 logger.info("请给on/off指令") 187 return False 188 189 # 上下电成功预期返回消息 190 recvMsg = "0,Success\n" 191 192 if usbpower.send(sendMsg): 193 recv = usbpower.receive() 194 195 logger.info(sendMsg) 196 logger.info(recv) 197 # 返回消息与预期相同,认为操作成功 198 if "0" in recv and "Success" in recv: 199 logger.info("操作成功") 200 return True 201 else: 202 logger.error("操作失败") 203 return False 204 else: 205 logger.error("操作失败") 206 return False 207 except Exception as e: 208 logger.error(e) 209 return False 210 finally: 211 usbpower.closeCon() 212 213def usbPowerOnOffV2(ip, port, index, power): 214 ''' 215 #=================================================================================== 216 # @Method: usbPowerOnOff_v2(ip, port) 217 # @Func: GT3000上下电执行方法,GT3000版本變化,命令有變化 218 # @Param: ip: box IP 219 # port:GT3000端口 220 # index:1-8,usb槽位序号 221 # power:on/off,上电/下电操作 222 # @eg: usbPowerOnOff("127.0.0.1", 7788,"on") 223 # @return: none 224 #=================================================================================== 225 ''' 226 227 usbpower = SocketCon(ip, int(port)) 228 229 try: 230 if power == "on": 231 sendMsg = "xdev usb signal " + str(index) + " on\n" 232 elif power == "off": 233 sendMsg = "xdev usb signal " + str(index) + " off\n" 234 else: 235 logger.info("请给on/off指令") 236 return False 237 238 # 上下电成功预期返回消息 239 recvMsg = "0,Success\n" 240 241 if usbpower.send(sendMsg): 242 recv = usbpower.receive() 243 244 logger.info(sendMsg) 245 logger.info(recv) 246 # 返回消息与预期相同,认为操作成功 247 if "0" in recv and "Success" in recv: 248 logger.info("操作成功") 249 return True 250 else: 251 logger.error("操作失败") 252 return False 253 else: 254 logger.error("操作失败") 255 return False 256 except Exception as e: 257 logger.error(e) 258 return False 259 finally: 260 usbpower.closeCon()