• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#-*- coding:utf-8 -*-
2import uuid
3import sys
4import subprocess
5import os
6import serial
7
8from core.base import BaseApp, dec_stepmsg
9from util.file_locker import FileLock
10from util.log_info import logger
11from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout
12from aw.Telnet.TelnetClient import TelConnect
13from aw.Common.Constant import CONSTANT
14from aw.Download.Download import *
15from aw.Common.Common import getHostIp, copyFile, copyDirectory
16from aw.ExtractFile.ExtractFile import *
17from aw.poweronoff.serial_power_on_off import usbPowerOnOff
18
19lock_suffix = CONSTANT.File.LOCK_SUFFIX  #通过文件锁实现并发下载
20suc_file = CONSTANT.File.SUC_FILE        #通过本文件是区分版本是否成功下载
21failed_file = CONSTANT.File.FAILED_FILE  #通过本文件是标记文件下载失败
22READ_MAXTIMEOUT = 60
23READ_TIMEOUT = 30
24READ_MINITIMEOUT = 5
25uboot_finish = 'hisilicon #'
26cmd_finish = ' #'
27
28class liteOsUpgrade_L1_shequ_test(BaseApp):
29    '''
30    @author: w00278233
31    '''
32
33    def __init__(self, param_file):
34        super().__init__(param_file)
35        self.param_List = ["deploy_com",
36                        "usb_port",
37                           "upgrade_upgradeLocation"]
38
39    @dec_stepmsg("hongmeng L1 flash")
40    def excute(self):
41        '''
42        #===================================================================================
43        #   @Method:        excute(self)
44        #   @Precondition:  none
45        #   @Func:          升级执行入口
46        #   @PostStatus:    none
47        #   @eg:            excute()
48        #   @return:        True or Flase
49        #===================================================================================
50        '''
51        step_index = self.params_dict.get("step_list").index("liteOsUpgrade_L1_shequ_test_app")
52
53        # 执行下载
54        try:
55            if not self.download():
56                CONSTANT.ENVERRMESSAGE = "image download fail"
57                logger.printLog(CONSTANT.ENVERRMESSAGE)
58                return False
59        except Exception as e:
60            raise e
61
62
63        # 执行升级
64        try:
65            if not self.upgrade():
66                CONSTANT.ENVERRMESSAGE = "board upgrade fail"
67                logger.printLog(CONSTANT.ENVERRMESSAGE)
68                return False
69            return True
70        except Exception as e:
71            raise e
72
73    @dec_stepmsg("download")
74    @timeout(1800)
75    def download(self):
76        '''
77        #===================================================================================
78        #   @Method:        download(self)
79        #   @Precondition:  none
80        #   @Func:          构建下载到本地的路径,执行相应包的下载
81        #   @PostStatus:    none
82        #   @eg:            download()
83        #   @return:        True or Flase
84        #===================================================================================
85        '''
86        global version_savepath, version_name
87        dir_path = CONSTANT.Path.getDirPath()
88        if self.params_dict.get("pbiid"):
89            version_path = self.params_dict.get("pbiid")
90            version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT"))
91            version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name)
92        else:
93            version_path = self.params_dict.get("upgrade_upgradeLocation")
94            version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation"))))
95            version_savepath = os.path.join(dir_path, version_name)
96
97        if self.params_dict.get("isDownload") == "True":
98            logger.printLog("不需要做下载,直接返回")
99            return True
100
101        #执行img下载
102        import hashlib
103        save_file_str = version_path.replace("/", "").replace("\\", "")
104        save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest()
105        logger.info("download hash value:%s" % (save_file_name))
106        save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt"))
107        if not self.excutedown(version_path, os.path.join(version_savepath, "img"), save_path_file, False):
108            logger.error("download img fail")
109            return False
110
111        #保存本地版本路径给devicetest去版本路径下取用例
112        saveVersion(save_path_file, os.path.join(version_savepath, "img"))
113
114        return True
115
116    def excutedown(self, source_path, download_dir, suc_mark, is_file):
117        '''
118        #===================================================================================
119        #   @Method:        excutedown(source_path, download_dir, is_file)
120        #   @Precondition:  none
121        #   @Func:          执行下载动作
122        #   @PostStatus:    none
123        #   @Param:         source_path:资源文件路径
124        #                   download_dir:文件下载到本地的文件夹路径
125        #                   is_file:是否是文件
126        #
127        #   @eg:            excutedown("xxxx", "D:\\local\\image", Flase, os_method)
128        #   @return:        True or Flase
129        #===================================================================================
130        '''
131        failed_mark = os.path.join(download_dir, failed_file)
132        lock_path = os.path.join(download_dir, lock_suffix)
133        file_lock = FileLock()
134
135        if isDownLoadSuccess(download_dir, suc_mark, failed_mark):
136            return True
137        try:
138            nowtime = get_now_time_str_info()
139            logger.printLog("%s Downloading, please wait" % nowtime)
140            file_lock.lockFile(lock_path)
141            ret = ""
142            logger.info("Get lock. Start to ")
143            if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True":
144                ret = downloadByBitComet(source_path, download_dir, os_method)
145            elif source_path.startswith('\\\\'):
146                ret = downloadByCopy(source_path, download_dir, is_file)
147            elif self.params_dict.get("pbiid"):
148                ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", self.params_dict.get("pbiid"))
149            elif source_path.startswith("http"):
150                ret = downloadFileFromDevCloud(source_path, "", "", download_dir)
151
152            if source_path.endswith(".zip"):
153                zip_name = os.path.basename(source_path)
154                ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir)
155            if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)):
156                if source_path.startswith("http") and ("file_id=" in source_path):
157                    if source_path.endswith(".tar.gz"):
158                        zip_name = source_path.split('=')[-1]
159                    else:
160                        zip_name = "out.tar.gz"
161                else:
162                    zip_name = os.path.basename(source_path)
163                ret = unTarFile(os.path.join(download_dir, zip_name), download_dir)
164            nowtime = get_now_time_str_info()
165            logger.printLog("%s download to %s end" % (nowtime, download_dir))
166
167            if not ret:
168                with open(failed_mark, "a+") as fp:
169                    fp.write("")
170            return ret
171        except Exception as e:
172            logger.printLog(e)
173            raise Exception(e)
174        finally:
175            file_lock.releaseFile()
176
177
178    @dec_stepmsg("upgrade")
179    #@timeout(900)
180    def upgrade(self):
181        '''
182        #===================================================================================
183        #   @Method:        upgrade(self)
184        #   @Precondition:  none
185        #   @Func:          升级相关业务逻辑
186        #   @PostStatus:    none
187        #   @eg:            upgrade()
188        #   @return:        True or Flase
189        #===================================================================================
190        '''
191        logger.printLog('开始升级')
192        deploy_com = self.params_dict.get("deploy_com")
193        usb_port = self.params_dict.get("usb_port")
194        baudrate = self.params_dict.get("baudrate")
195        #芯片类型,根据芯片类型获取对应的刷机命令
196        flash_type = self.params_dict.get("flash_type")
197        burn_usbport = self.params_dict.get("hiburn_usbport")
198        device_ip = self.params_dict.get("Device_IP")
199        device_netmask = self.params_dict.get("Device_Netmask")
200        device_gatewayip = self.params_dict.get("Device_GatewayIP")
201        chip_version = self.params_dict.get('chip_version')
202        chip_version = chip_version.lower() if chip_version else chip_version
203
204        if not deploy_com:
205            logger.error("deploy_com is NULL !!")
206            return False
207        if not burn_usbport:
208            logger.error("hiburn_usbport is NULL !!")
209            return False
210        if not baudrate:
211            baudrate = 115200
212        scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))))
213        #升级需要的工具归档
214        logger.printLog('归档工具')
215        toolworkspace = CONSTANT.OSType.getworkspace()
216        hiburntoolpath = os.path.join(toolworkspace, "HiBurnCmdLine", "usb%s_tool" % burn_usbport)
217        logger.info("hiburn tool path is: %s" % hiburntoolpath)
218        if not os.path.exists(hiburntoolpath):
219            if not burn_usbport:
220                logger.error("hiburn_usbport is NULL !!")
221                return False
222            os.makedirs(hiburntoolpath)
223            toolpath = os.path.join(scriptpath, "resource", "HiBurnCmdLine.zip")
224            logger.info("copy %s to %s" % (toolpath, hiburntoolpath))
225            copyFile(toolpath, hiburntoolpath)
226            zip_name = os.path.basename(toolpath)
227            ret = extractZipFile(os.path.join(hiburntoolpath, zip_name), hiburntoolpath)
228            if ret:
229                logger.info("unzip to %s succ" % (hiburntoolpath))
230                #修改burn.config中的usb口
231                configpath = os.path.join(hiburntoolpath, "config", "burn.config")
232                all_data = ""
233                with open(configpath, "r", encoding="utf-8") as cf:
234                    for line in cf:
235                        if "usbDeviceNumber=" in line:
236                            old_str = line
237                            line = line.replace(old_str, "usbDeviceNumber=%s\r\n" % burn_usbport)
238                            logger.info("replace line: %s " % line)
239                        all_data += line
240                with open(configpath, "w", encoding="utf-8") as wf:
241                    wf.write(all_data)
242            else:
243                logger.error("%s is not exit" % hiburntoolpath)
244                return False
245        #将升级需要的文件拷贝到镜像里面
246        logger.printLog('拷贝镜像')
247        local_image_path = os.path.join(version_savepath, "img")
248        old_xml_path = os.path.join(scriptpath, "resource", "L1", flash_type, "usb-burn.xml")
249        xml_path = os.path.join(local_image_path, "usb-burn.xml")
250        if chip_version == 'hi3518':
251            old_xml_path = os.path.join(scriptpath, "resource", "L1", flash_type, "usb-burn-jffs2.xml")
252        elif chip_version == 'hi3516':
253            old_xml_path = os.path.join(scriptpath, "resource", "L1", flash_type, "usb-burn-vfat.xml")
254        copyFile(old_xml_path, xml_path)
255        if flash_type.lower() == "ev300":
256            chip_type = "Hi3518EV300"
257            ubootpath = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "u-boot-hi3518ev300.bin")
258        elif flash_type.lower() == "dv300":
259            chip_type = "Hi3516DV300"
260            ubootpath = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "u-boot-hi3516dv300.bin")
261        else:
262            logger.error("flash_type is : %s " % flash_type)
263            return False
264        copyFile(ubootpath, local_image_path)
265        scriptfile = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "update.txt")
266
267        from threading import Thread
268        thread = Thread(target=boardPowerOn, args=[usb_port, 10])
269        thread.start()
270
271        current_path = os.getcwd()
272        logger.info("before excute hiburn,current path is: %s" % current_path)
273        os.chdir(hiburntoolpath)
274        logger.info("excute hiburn path is: %s" % os.getcwd())
275        #擦除fastboot
276        logger.printLog('擦除fastboot')
277        flash_uboot_xml = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "flash_fastboot.xml")
278        cmd = ".\jre\\bin\java -jar hiburn.jar --erase -n %s -m serial %s -x %s" % (chip_type, deploy_com.upper(), flash_uboot_xml)
279        logger.info("cmd is: %s" % cmd)
280        ret, outpri = subprocess.getstatusoutput(cmd)
281        logger.info("flash fastboot result: %s " % ret)
282        logger.info("print console: %s " % outpri)
283
284        retry = 0
285        while retry < 2:
286
287            #usb刷机
288            logger.printLog('开始刷机')
289            cmd = ".\jre\\bin\java -jar hiburn.jar --burn -n %s -m USBBootrom -x %s" % (chip_type, xml_path)
290            logger.info("cmd is: %s" % cmd)
291            ret, outpri = subprocess.getstatusoutput(cmd)
292            logger.info("usb upgrade result: %s " % ret)
293            logger.info("print console: %s " % outpri)
294            if ret != 0:
295                if ret == 4:
296                    time.sleep(10)
297                    retry = retry + 1
298                    logger.printLog('刷机未成功再刷一次')
299                    continue
300                logger.printLog(ret)
301                logger.error("hiburn usb upgrade failed!!")
302                return False
303            retry = retry + 3
304        os.chdir(current_path)
305        logger.info("hiburn upgrade end, check board status")
306        time.sleep(5)
307        ser = serial.Serial(deploy_com, int(baudrate), timeout=1)
308        reset_count = 0
309        try:
310            while reset_count < 2:
311                logger.info("打开serial")
312                if ser.is_open == False:
313                    ser.open()
314
315                logger.info("获取单板状态")
316                board_type = getBoardType(ser)
317                logger.printLog('before reset')
318                logger.printLog(f'board_type:{board_type}')
319                if board_type == "uboot":
320                    with open(scriptfile, "r") as fp:
321                        lines = fp.readlines()
322                    for line in lines:
323                        if not line:
324                            logger.info("cmd is: %s " % line)
325                            continue
326                        if "reset" in line:
327                            ret = sendCmd(ser, line, READ_MAXTIMEOUT)
328                            continue
329                        ret = sendCmd(ser, line, READ_MINITIMEOUT)
330                    board_type = getBoardType(ser)
331                    logger.printLog('after reset')
332                    logger.printLog(f'board_type:{board_type}')
333
334                    if True:
335                        reset_count += 1
336                        logger.printLog('再次重启')
337                        continue
338                    time.sleep(1000)
339                    if flash_type.lower() == "dv300":
340                        logger.info("upgrade success")
341                        init_cmd = "ifconfig eth0 %s netmask %s gateway %s \r" % (device_ip, device_netmask, device_gatewayip)
342                        sendCmd(ser, init_cmd, READ_MINITIMEOUT)
343                        sendCmd(ser, 'ifconfig\r', READ_MINITIMEOUT)
344                        return True
345                    elif flash_type.lower() == "ev300":
346                        logger.info("setup wifi")
347                        cmd = 'ls\r'
348                        ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
349                        if not "sdcard" in ret.lower():
350                            cmd = 'mkdir /sdcard\r'
351                            ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
352                            if "error:" in ret.lower():
353                                logger.error("mkdir /sdcard fail")
354                                return False
355                        cmd = 'mount /dev/mmcblk0p0 /sdcard vfat\r'
356                        ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
357                        cmd = 'ls /sdcard\r'
358                        ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
359                        cmd = 'cd /sdcard/wpa\r'
360                        ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
361                        cmd = 'exec wpa_supplicant -i wlan0 -c wpa_supplicant.conf \r'
362                        ret = sendCmd(ser, cmd, READ_MAXTIMEOUT)
363                        if "error:" in ret.lower():
364                            logger.error("setup wifi fail")
365                            return False
366                        cmd = 'ifconfig\r'
367                        ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
368                        if "error:" in ret.lower():
369                            logger.error("ifconfig fail")
370                            return False
371                        logger.info("upgrade success")
372                        return True
373            else:
374                logger.error("upgrade fail")
375                return False
376        except Exception as e:
377            logger.printLog(e)
378            return False
379        finally:
380            ser.close()
381            logger.info("close serial")
382
383
384def boardPowerOn(usb_port, waittime):
385    logger.info("board power on start")
386    time.sleep(waittime)
387
388    #对端口下电
389    if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "off"):
390        logger.error("board power off failed")
391        return False
392
393    #对端口上电
394    if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "on"):
395        logger.error("board power on failed")
396        return False
397    logger.info("board power on end")
398
399
400def getBoardType(ser):
401    ret = sendCmd(ser, '\r', READ_TIMEOUT)
402    if 'HMOS' in ret or 'OHOS' in ret:
403        ostype = 'OHOS'
404    elif 'hisilicon' in ret:
405        ostype = 'uboot'
406    elif ' #' in ret:
407        ostype = 'linux'
408    else:
409        ostype = 'bootrom'
410    logger.info("board type is: %s" % ostype)
411    return ostype
412
413def sendCmd(ser, cmd, timeout):
414    logger.info("cmd is: %s " % cmd)
415    ser.write((cmd + '\n').encode())
416    time.sleep(0.5)
417    ret = ''
418    i = 0
419    while True:
420        out = ser.read(ser.inWaiting())
421        if not out:
422            break
423        if i > 2:
424            break
425        ret = ret + out.decode(encoding="utf-8", errors="ignore")
426        time.sleep(timeout)
427        i = i + 1
428    logger.info("result is: %s " % ret)
429    return ret
430
431
432
433if __name__ == "__main__":
434    param_file = sys.argv[1]
435    if not param_file:
436        logger.printLog("Missing params file")
437        sys.exit(-1)
438    try:
439        uphandle = liteOsUpgrade_L1(param_file)
440        uphandle._excuteApp()
441    except Exception as e:
442        logger.printLog(e)
443        sys.exit(-1)
444