• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# encoding=utf-8
2
3'''
4======================================================================================
5版权 (C) 2015-2020, Huawei Technologies Co., HUTAF xDevice
6========================================================================================
7@FileName:    telnet_client.py
8@Function:    telnet operation to os
9@Author:
10@Date:
11======================================================================================
12'''
13
14import os
15import platform
16import telnetlib
17import time
18
19from util.log_info import logger
20
21class TelConnect():
22
23    def __init__(self, telip, telport):
24        logger.info("init telnet %s:%s" % (telip, telport))
25        self._host = telip
26        self._port = int(telport)
27        self.__timeout = 10
28        self.device = telnetlib.Telnet(self._host, port=self._port, timeout=self.__timeout)
29
30    def tellogin(self, username, password, endwaittag, timeout):
31        logger.info("telnet login")
32        rets = self.device.read_until(endwaittag, timeout)
33        logger.info("cmd result: %s" % rets)
34        if (endwaittag in rets) or not rets:
35            logger.info("Noneed login")
36            return True
37        else:
38            rets = self.sendCmdAndCheckResult(username.encode('utf-8'), 'Password:', timeout)
39            if rets == False:
40                logger.error("telnet login Failed!!")
41                return False
42            rets = self.sendCmdAndCheckResult(password.encode('utf-8'), endwaittag, timeout)
43            if rets == False:
44                logger.error("telnet login Failed!!")
45                return False
46
47    def sendCmd(self, send_cmd, endtag, timeout):
48        try:
49            logger.info(send_cmd)
50            self.device.write(send_cmd)
51            rets = self.device.read_until(endtag.encode('utf-8'), timeout)
52            return rets.decode('utf-8', 'ignore')
53        except Exception as e:
54            logger.error(e)
55            return False
56
57    def sendEnterCmd(self, endtag, timeout):
58        try:
59            logger.info("send enter to board")
60            i = 0
61            while i < 4:
62                cmd = b'\r\n'
63                rets = self.sendCmd(cmd, endtag, 1)
64                if (endtag in rets):
65                    return True
66                logger.info("retry send Enter")
67                i = i + 1
68            return False
69        except Exception as e:
70            logger.error(e)
71            return False
72
73    def sendCmdAndCheckResult(self, cmd, endtag, timeout):
74        try:
75            rets = self.sendCmd(cmd + '\n'.encode('utf-8'), endtag, timeout)
76            logger.info("result: %s" % rets)
77            if (endtag in rets):
78                if (not "error:" in rets.lower()):
79                    return True
80            logger.error("cmd not end or cmd failed, please check board !!!")
81            return False
82        except Exception as e:
83            logger.error(e)
84            return False
85
86    def sendCmdAndCheckSucTag(self, cmd, endtag, suctag , timeout):
87        try:
88            rets = self.sendCmd(cmd + '\n'.encode('utf-8'), endtag, timeout)
89            logger.info("result: %s" % rets)
90            if (endtag in rets):
91                if (suctag in rets) or (not "error:" in rets.lower()):
92                    return True
93            logger.error("cmd not end or cmd failed, please check board !!!")
94            return False
95        except Exception as e:
96            logger.error(e)
97            return False
98
99    def sendUpgradeCmd(self, sendcmd, endwaittag, suctag, timeout):
100        if suctag:
101            logger.info("cmd is:  %s" % sendcmd)
102            return self.sendCmdAndCheckSucTag(sendcmd.encode('utf-8'), endwaittag, suctag, timeout)
103        else:
104            logger.info("cmd is:  %s" % sendcmd)
105            return self.sendCmdAndCheckResult(sendcmd.encode('utf-8'), endwaittag, timeout)
106
107    def sendResetCmd(self, endtag, timeout):
108        logger.info("send cmd reset")
109        sendcmd = b'reset\n'
110        try:
111            #rets = self.sendCmd(sendcmd, endtag, timeout)
112            self.device.write(sendcmd)
113            if not self.sendEnterCmd(endtag, timeout):
114                logger.error("send Enter fail")
115                return False
116            return True
117        except Exception as e:
118            logger.error(e)
119            return False
120
121    def getBoardType(self, endtag, timeout):
122        rcvStr = self.sendCmd(b'\r\n', endtag, timeout)
123        if 'HMOS' in rcvStr or 'OHOS' in rcvStr :
124            ostype = 'OHOS'
125        elif 'hisilicon' in rcvStr:
126            ostype = 'uboot'
127        elif ' #' in rcvStr:
128            ostype = 'linux'
129        else:
130            ostype = 'bootrom'
131        logger.info("board type is: %s" % ostype)
132        return ostype
133
134    def close(self):
135        self.device.close()
136
137
138