• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#-*- coding:utf-8 -*-
2import uuid
3import sys
4import subprocess
5import os
6
7from core.base import BaseApp, dec_stepmsg
8from util.file_locker import FileLock
9from util.log_info import logger
10from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout
11from aw.Telnet.TelnetClient import TelConnect
12from aw.Common.Constant import CONSTANT
13from aw.Download.Download import *
14from aw.Common.Common import getHostIp
15from aw.ExtractFile.ExtractFile import *
16from aw.poweronoff.serial_power_on_off import serialPowerOnOff
17
18lock_suffix = CONSTANT.File.LOCK_SUFFIX  #通过文件锁实现并发下载
19suc_file = CONSTANT.File.SUC_FILE        #通过本文件是区分版本是否成功下载
20failed_file = CONSTANT.File.FAILED_FILE  #通过本文件是标记文件下载失败
21READ_MAXTIMEOUT = 600
22READ_TIMEOUT = 30
23READ_MINITIMEOUT = 5
24uboot_finish = 'hisilicon #'
25cmd_finish = ' #'
26
27class liteOsUpgrade_L1(BaseApp):
28    '''
29    @author: w00278233
30    '''
31
32    def __init__(self, param_file):
33        super().__init__(param_file)
34        self.param_List = ["Telnet_IP",
35                        "Telnet_Port",
36                           "upgrade_upgradeLocation"]
37
38    @dec_stepmsg("hongmeng L1 flash")
39    def excute(self):
40        '''
41        #===================================================================================
42        #   @Method:        excute(self)
43        #   @Precondition:  none
44        #   @Func:          升级执行入口
45        #   @PostStatus:    none
46        #   @eg:            excute()
47        #   @return:        True or Flase
48        #===================================================================================
49        '''
50        step_index = self.params_dict.get("step_list").index("liteOsUpgrade_L1_app")
51
52        # 执行下载
53        try:
54            if not self.download():
55                CONSTANT.ENVERRMESSAGE = "image download fail"
56                logger.printLog(CONSTANT.ENVERRMESSAGE)
57                return False
58        except Exception as e:
59            raise e
60
61        # 执行升级
62        try:
63            if not self.upgrade():
64                CONSTANT.ENVERRMESSAGE = "board upgrade fail"
65                logger.printLog(CONSTANT.ENVERRMESSAGE)
66                return False
67            return True
68        except Exception as e:
69            raise e
70
71    @dec_stepmsg("download")
72    @timeout(1800)
73    def download(self):
74        '''
75        #===================================================================================
76        #   @Method:        download(self)
77        #   @Precondition:  none
78        #   @Func:          构建下载到本地的路径,执行相应包的下载
79        #   @PostStatus:    none
80        #   @eg:            download()
81        #   @return:        True or Flase
82        #===================================================================================
83        '''
84        global version_savepath, version_name
85        dir_path = CONSTANT.Path.getDirPath()
86        if self.params_dict.get("pbiid"):
87            version_path = self.params_dict.get("pbiid")
88            version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT"))
89            version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name)
90        else:
91            version_path = self.params_dict.get("upgrade_upgradeLocation")
92            version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation"))))
93            version_savepath = os.path.join(dir_path, version_name)
94
95        if self.params_dict.get("isDownload") == "True":
96            logger.printLog("不需要做下载,直接返回")
97            return True
98
99        #执行img下载
100        import hashlib
101        save_file_str = version_path.replace("/", "").replace("\\", "")
102        save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest()
103        logger.info("download hash string:%s, hash value:%s" % (save_file_str, save_file_name))
104        save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt"))
105        if not self.excutedown(version_path, os.path.join(version_savepath, "img"), save_path_file, False):
106            logger.error("download img fail")
107            return False
108
109        #保存本地版本路径给devicetest去版本路径下取用例
110        saveVersion(save_path_file, os.path.join(version_savepath, "img"))
111
112        # 执行升级脚本下载
113        if self.params_dict.get("upgrade_script"):
114            suc_mark = os.path.join(version_savepath, "scriptfile", suc_file)
115            if not self.excutedown(self.params_dict.get("upgrade_script"),
116                                   os.path.join(version_savepath, "scriptfile"), suc_mark, True):
117                logger.error("download upgrade script fail")
118                return False
119            with open(suc_mark, "a+") as fp:
120                fp.write("")
121
122        return True
123
124    def excutedown(self, source_path, download_dir, suc_mark, is_file):
125        '''
126        #===================================================================================
127        #   @Method:        excutedown(source_path, download_dir, is_file)
128        #   @Precondition:  none
129        #   @Func:          执行下载动作
130        #   @PostStatus:    none
131        #   @Param:         source_path:资源文件路径
132        #                   download_dir:文件下载到本地的文件夹路径
133        #                   is_file:是否是文件
134        #
135        #   @eg:            excutedown("xxxx", "D:\\local\\image", Flase, os_method)
136        #   @return:        True or Flase
137        #===================================================================================
138        '''
139        failed_mark = os.path.join(download_dir, failed_file)
140        lock_path = os.path.join(download_dir, lock_suffix)
141        file_lock = FileLock()
142
143        if isDownLoadSuccess(download_dir, suc_mark, failed_mark):
144            return True
145        try:
146            nowtime = get_now_time_str_info()
147            logger.printLog("%s Downloading, please wait" % nowtime)
148            file_lock.lockFile(lock_path)
149            ret = ""
150            logger.info("Get lock. Start to ")
151            if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True":
152                ret = downloadByBitComet(source_path, download_dir, os_method)
153            elif source_path.startswith('\\\\'):
154                ret = downloadByCopy(source_path, download_dir, is_file)
155            elif self.params_dict.get("pbiid"):
156                ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", self.params_dict.get("pbiid"))
157            elif source_path.startswith("http"):
158                ret = downloadFileFromDevCloud(source_path, "", "", download_dir)
159
160            if source_path.endswith(".zip"):
161                zip_name = os.path.basename(source_path)
162                ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir)
163            if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)):
164                if source_path.startswith("http") and ("file_id=" in source_path):
165                    if source_path.endswith(".tar.gz"):
166                        zip_name = source_path.split('=')[-1]
167                    else:
168                        zip_name = "out.tar.gz"
169                else:
170                    zip_name = os.path.basename(source_path)
171                ret = unTarFile(os.path.join(download_dir, zip_name), download_dir)
172            nowtime = get_now_time_str_info()
173            logger.printLog("%s download to %s end" % (nowtime, download_dir))
174
175            if not ret:
176                with open(failed_mark, "a+") as fp:
177                    fp.write("")
178            return ret
179        except Exception as e:
180            logger.printLog(e)
181            raise Exception(e)
182        finally:
183            file_lock.releaseFile()
184
185
186    @dec_stepmsg("upgrade")
187    @timeout(900)
188    def upgrade(self):
189        '''
190        #===================================================================================
191        #   @Method:        upgrade(self)
192        #   @Precondition:  none
193        #   @Func:          升级相关业务逻辑
194        #   @PostStatus:    none
195        #   @eg:            upgrade()
196        #   @return:        True or Flase
197        #===================================================================================
198        '''
199
200        tel_IP = self.params_dict.get("Telnet_IP")
201        tel_port = self.params_dict.get("Telnet_Port")
202        tftp_ip = self.params_dict.get("Tftpserver_IP")
203        device_ip = self.params_dict.get("Device_IP")
204        device_Mac = self.params_dict.get("Device_MAC")
205        device_netmask = self.params_dict.get("Device_Netmask")
206        device_gatewayip = self.params_dict.get("Device_GatewayIP")
207        #芯片类型,根据芯片类型获取对应的刷机命令
208        flash_type = self.params_dict.get("flash_type")
209        #串口服务器命令端口
210        tel_cmd_port = self.params_dict.get("cmd_port")
211        #单板的电源端口
212        power_port = self.params_dict.get("board_power_port")
213        tel_username = self.params_dict.get("server_cmd_username")
214        tel_passwd = self.params_dict.get("server_cmd_password")
215
216        if not tel_IP or not tel_port:
217            logger.error("Telnet_IP or Telnet_Port is NULL !!")
218            return False
219        if not tftp_ip:
220            tftp_ip = getHostIp()
221            logger.info("get host ip, tftp_ip is %s" % tftp_ip)
222        if not device_netmask:
223            device_netmask = "255.255.252.0"
224        if not device_Mac:
225            device_Mac = "3a:82:d0:08:f4:99"
226        if not tel_cmd_port:
227            tel_cmd_port = "18984"
228        local_image_path = "%s/%s" % (version_name, "img")
229        if self.params_dict.get("upgrade_script"):
230            script_name = self.params_dict.get("upgrade_script").split('\\')[-1]
231            scriptfile = os.path.join(version_savepath, "scriptfile", script_name)
232        else:
233            scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))))
234            if "ev" in flash_type:
235                scriptfile = os.path.join(scriptpath, "resource", "L1", "ev200", "update.txt")
236            elif "dv" in flash_type:
237                scriptfile = os.path.join(scriptpath, "resource", "L1", "dv300", "update.txt")
238        logger.info("upgrade scriptfile is: %s" % scriptfile)
239        if not os.path.exists(scriptfile):
240            logger.error("%s is not exit" % scriptfile)
241            return False
242
243        tnc = TelConnect(tel_IP, tel_port)
244        try:
245            board_type = tnc.getBoardType(cmd_finish, READ_MINITIMEOUT)
246            if board_type != "uboot":
247                if not tnc.sendResetCmd(uboot_finish, READ_TIMEOUT):
248                    #上下电恢复单板
249                    if power_port:
250                        if tel_passwd:
251                            if not recoveryBoard(tnc, tel_IP, tel_cmd_port, power_port, tel_username, tel_passwd):
252                                logger.error("recovery board fail, please check board")
253                                return False
254                        else:
255                            if not recoveryBoard(tnc, tel_IP, tel_cmd_port, power_port, tel_username):
256                                logger.error("recovery board fail, please check board")
257                                return False
258                    else:
259                        logger.error("go hisilicon # fail, please check board status")
260                        return False
261
262            with open(scriptfile, "r") as fp:
263                lines = fp.readlines()
264                for line in lines:
265                    line = line.strip()
266                    if not line:
267                        logger.info("cmd is: %s " % line)
268                        continue
269                    if "%Tftpserver_IP%" in line:
270                        line = line.replace("%Tftpserver_IP%", tftp_ip)
271                    if "%Device_IP%" in line:
272                        line = line.replace("%Device_IP%", device_ip)
273                    if "%Device_MAC%" in line:
274                        line = line.replace("%Device_MAC%", device_Mac)
275                    if "%Device_Netmask%" in line:
276                        line = line.replace("%Device_Netmask%", device_netmask)
277                    if "%Device_GatewayIP%" in line:
278                        line = line.replace("%Device_GatewayIP%", device_gatewayip)
279                    if "userfs.img" in line:
280                        if not os.path.exists(os.path.join(version_savepath, "img", "userfs.img")):
281                            line = line.replace("userfs.img", "userfs_jffs2.img")
282                    if "rootfs.img" in line:
283                        if not os.path.exists(os.path.join(version_savepath, "img", "rootfs.img")):
284                            line = line.replace("rootfs.img", "rootfs_jffs2.img")
285                    if "tftp" in line:
286                        packagefile_path = "/%s" % flash_type
287                        new_packagefile_path = "/%s" % local_image_path
288                        line = line.replace(packagefile_path, new_packagefile_path)
289                        if not tnc.sendUpgradeCmd(line, uboot_finish, "done", READ_MAXTIMEOUT):
290                            return False
291                        continue
292                    if "setenv" in line:
293                        if not tnc.sendUpgradeCmd(line, uboot_finish, "", READ_MINITIMEOUT):
294                            return False
295                        continue
296                    if "EXIT" in line:
297                        break
298                    if "go " in line:
299                        if not tnc.sendUpgradeCmd(line, cmd_finish, "", READ_TIMEOUT):
300                            logger.error("go OHOS # fail, please check board status")
301                            return False
302                        continue
303                    if not tnc.sendUpgradeCmd(line, cmd_finish, "", READ_TIMEOUT):
304                        return False
305            board_type = tnc.getBoardType(cmd_finish, READ_MINITIMEOUT)
306            if board_type == "OHOS":
307                logger.info("upgrade success")
308                return True
309            else:
310                logger.info("upgrade fail")
311                return False
312        except Exception as e:
313            logger.printLog(e)
314            return False
315        finally:
316            tnc.close()
317            logger.info("close telnet")
318
319def recoveryBoard(tnc, telip, cmdport, powerport, username, passwd=''):
320    logger.info("start recovery board")
321    # 先执行下电操作
322    if not serialPowerOnOff(telip, cmdport, powerport, "off", username, passwd):
323        logger.error("board power off failed")
324        return False
325    #再执行上电操作
326    if not serialPowerOnOff(telip, cmdport, powerport, "on", username, passwd):
327        logger.error("board power on failed")
328        return False
329    #再发送回车
330    if not tnc.sendEnterCmd(" #", READ_MINITIMEOUT):
331        logger.error("send Enter failed")
332        return False
333    board_type = tnc.getBoardType(cmd_finish, READ_MINITIMEOUT)
334    if board_type == "bootrom":
335        logger.error("recovery board failed")
336        return False
337    logger.info("recovery board succ")
338    return True
339
340
341if __name__ == "__main__":
342    param_file = sys.argv[1]
343    if not param_file:
344        logger.printLog("Missing params file")
345        sys.exit(-1)
346    try:
347        uphandle = liteOsUpgrade_L1(param_file)
348        uphandle._excuteApp()
349    except Exception as e:
350        logger.printLog(e)
351        sys.exit(-1)
352