• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import sys, time
2import select
3from socket import socket
4from util.log_info import logger
5
6class SocketCon(socket):
7    def __init__(self, ip, port, timeout=10, encoding="GBK"):
8        '''
9       #===================================================================================
10       #   @Method:        __init__(self, ip, port,timeout=20,encoding="GBK")
11       #   @Func:          初始化方法,创建telnet连接
12       #   @Param:         ip: ip地址
13       #                   port:端口号
14       #                   timeout:超时时间
15       #                   encoding:字符串和字节转换的格式
16       #   @eg:            __init__("127.0.0.1", 7788,10,utf-8)
17       #   @return:        none
18       #===================================================================================
19       '''
20        super().__init__()
21        try:
22            self.encoding = encoding
23            self.settimeout(timeout)
24            self.connect((ip, port))
25        except Exception as e:
26            if isinstance(e, ConnectionRefusedError):
27                logger.error("ConnectionRefusedError")
28            else:
29                logger.error(e)
30
31    def closeCon(self):
32        '''
33        #===================================================================================
34        #   @Method:        __init__(self, ip, port,timeout=20,encoding="GBK")
35        #   @Func:          关闭连接
36        #   @Param:         无
37        #   @eg:            closeCon()
38        #   @return:        none
39        #===================================================================================
40        '''
41        self.close()
42
43    def send(self, msg):
44        '''
45       #===================================================================================
46       #   @Method:        send(self,msg)
47       #   @Func:          发送消息给box或串口服务器
48       #   @Param:         msg
49       #   @eg:            send("XDEV USB SIGNal 0 ON")
50       #   @return:        Bool
51       #===================================================================================
52       '''
53        try:
54            self.sendall(msg.encode(self.encoding))
55            return True
56        except Exception as e:
57            logger.error(e)
58            return False
59
60    def login(self, user="Administrator", password=""):
61        '''
62       #===================================================================================
63       #   @Method:        login(self,user,password)
64       #   @Func:          串口服务器认证
65       #   @Param:         user:用户名
66       #                   password:密码
67       #   @eg:            login("Administrator","")
68       #   @return:        Bool
69       #===================================================================================
70       '''
71        msg = user + "/" + password
72
73        if self.send(msg):
74            recv = self.receive()
75            if recv == "认证错误!":
76                return False
77            elif recv == "Error":
78                return False
79            else:
80                return True
81        else:
82            return False
83
84    def receive(self, size=16384):
85        '''
86       #===================================================================================
87       #   @Method:        receive(self,user,size)
88       #   @Func:          接收远程主机或串口服务器返回的消息
89       #   @Param:         size:
90       #   @eg:            receive(16384)
91       #   @return:        str
92       #===================================================================================
93       '''
94
95        try:
96            ready = select.select([self], [], [], 2)
97            if ready[0]:
98                ret = self.recv(size)
99                if isinstance(ret, bytes):
100                    recv = ret.decode(self.encoding)
101                    return recv
102                return ""
103        except Exception as e:
104            logger.error(e)
105            return "Error"
106
107
108def serialPowerOnOff(ip, port, index, power, user='Administrator', password=''):
109    '''
110    #===================================================================================
111    #   @Method:        serialPowerOnOff(ip, port,index,power,user='Administrator', password='')
112    #   @Func:          上下电执行方法
113    #   @Param:         ip: 串口服务器ip
114    #                   port:上下电命令端口
115    #                   index:1-48,电源端口序号
116    #                   power:on/off,上电/下电操作
117    #                   user:串口服务器的用户名,默认取Administrator
118    #                   password:串口服务器的密码,默认取空
119    #   @eg:            serialPowerOnOff("10.176.49.48", 18984,1,"on")
120    #   @return:        none
121    #===================================================================================
122    '''
123    try:
124        serialPort = SocketCon(ip, int(port))
125
126        # 串口服务器认证
127        if not serialPort.login(user=user, password=password):
128            logger.error("认证错误")
129            return False
130
131        if power == "on":
132            # 串口上电指令
133            sendMsg = "ctrlpower." + str(index) + "=On\n"
134            # 上电成功预期返回消息
135            recvMsg = "ctrlpower." + str(index) + "=1\n"
136        elif power == "off":
137            # 串口下电指令
138            sendMsg = "ctrlpower." + str(index) + "=Off\n"
139            # 下电成功预期返回消息
140            recvMsg = "ctrlpower." + str(index) + "=0\n"
141        else:
142            logger.error("请给on/off指令")
143            return False
144
145        if serialPort.send(sendMsg):
146            recv = serialPort.receive()
147
148            # 返回消息与预期相同,认为操作成功
149            if recv == recvMsg:
150                logger.info("操作成功")
151                return True
152            else:
153                logger.error("操作失败")
154                return False
155        else:
156            logger.error("操作失败")
157            return False
158    except Exception as e:
159        logger.error(e)
160        return False
161    finally:
162        serialPort.closeCon()
163
164def usbPowerOnOff(ip, port, index, power):
165    '''
166    #===================================================================================
167    #   @Method:        usbPowerOnOff(ip, port)
168    #   @Func:          GT3000上下电执行方法
169    #   @Param:         ip: box IP
170    #                   port:GT3000端口
171    #                   index:1-8,usb槽位序号
172    #                   power:on/off,上电/下电操作
173    #   @eg:            usbPowerOnOff("127.0.0.1", 7788,"on")
174    #   @return:        none
175    #===================================================================================
176    '''
177
178    usbpower = SocketCon(ip, int(port))
179
180    try:
181        if power == "on":
182            sendMsg = "XDEV USB POWEr " + str(index) + " ON\n"
183        elif power == "off":
184            sendMsg = "XDEV USB POWEr " + str(index) + " OFF\n"
185        else:
186            logger.info("请给on/off指令")
187            return False
188
189        # 上下电成功预期返回消息
190        recvMsg = "0,Success\n"
191
192        if usbpower.send(sendMsg):
193            recv = usbpower.receive()
194
195            logger.info(sendMsg)
196            logger.info(recv)
197            # 返回消息与预期相同,认为操作成功
198            if "0" in recv and "Success" in recv:
199                logger.info("操作成功")
200                return True
201            else:
202                logger.error("操作失败")
203                return False
204        else:
205            logger.error("操作失败")
206            return False
207    except Exception as e:
208        logger.error(e)
209        return False
210    finally:
211        usbpower.closeCon()
212
213def usbPowerOnOffV2(ip, port, index, power):
214    '''
215    #===================================================================================
216    #   @Method:        usbPowerOnOff_v2(ip, port)
217    #   @Func:          GT3000上下电执行方法,GT3000版本變化,命令有變化
218    #   @Param:         ip: box IP
219    #                   port:GT3000端口
220    #                   index:1-8,usb槽位序号
221    #                   power:on/off,上电/下电操作
222    #   @eg:            usbPowerOnOff("127.0.0.1", 7788,"on")
223    #   @return:        none
224    #===================================================================================
225    '''
226
227    usbpower = SocketCon(ip, int(port))
228
229    try:
230        if power == "on":
231            sendMsg = "xdev usb signal " + str(index) + " on\n"
232        elif power == "off":
233            sendMsg = "xdev usb signal " + str(index) + " off\n"
234        else:
235            logger.info("请给on/off指令")
236            return False
237
238        # 上下电成功预期返回消息
239        recvMsg = "0,Success\n"
240
241        if usbpower.send(sendMsg):
242            recv = usbpower.receive()
243
244            logger.info(sendMsg)
245            logger.info(recv)
246            # 返回消息与预期相同,认为操作成功
247            if "0" in recv and "Success" in recv:
248                logger.info("操作成功")
249                return True
250            else:
251                logger.error("操作失败")
252                return False
253        else:
254            logger.error("操作失败")
255            return False
256    except Exception as e:
257        logger.error(e)
258        return False
259    finally:
260        usbpower.closeCon()