• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#-*- coding:utf-8 -*-
2import uuid
3import sys
4import subprocess
5import os
6import serial
7import time
8import re
9
10from core.base import BaseApp, dec_stepmsg
11from util.file_locker import FileLock
12from util.log_info import logger
13from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout
14from aw.Telnet.TelnetClient import TelConnect
15from aw.Common.Constant import CONSTANT
16from aw.Download.Download import *
17from aw.Common.Common import getHostIp, copyFile, copyDirectory
18from aw.ExtractFile.ExtractFile import *
19from aw.poweronoff.serial_power_on_off import usbPowerOnOff
20from threading import Thread
21
22lock_suffix = CONSTANT.File.LOCK_SUFFIX  #通过文件锁实现并发下载
23suc_file = CONSTANT.File.SUC_FILE        #通过本文件是区分版本是否成功下载
24failed_file = CONSTANT.File.FAILED_FILE  #通过本文件是标记文件下载失败
25READ_MAXTIMEOUT = 5
26READ_TIMEOUT = 5
27READ_MINITIMEOUT = 2
28uboot_finish = 'hisilicon #'
29cmd_finish = ' #'
30error_str_list = ['Unknown', '发送起始帧失败', '发送头帧失败']
31ip_cmd = 'ping 192.168.18.1'
32
33class liteOsUpgrade_linux(BaseApp):
34    '''
35    @author: cwx1076044
36    '''
37
38    def __init__(self, param_file):
39        super().__init__(param_file)
40        self.param_List = ["deploy_com",
41                        "usb_port",
42                           "upgrade_upgradeLocation"]
43
44    @dec_stepmsg("linux L1 flash")
45    def excute(self):
46        '''
47        #===================================================================================
48        #   @Method:        excute(self)
49        #   @Precondition:  none
50        #   @Func:          升级执行入口
51        #   @PostStatus:    none
52        #   @eg:            excute()
53        #   @return:        True or Flase
54        #===================================================================================
55        '''
56        step_index = self.params_dict.get("step_list").index("liteOsUpgrade_linux_app")
57
58        # 执行下载
59        try:
60            if not self.download():
61                CONSTANT.ENVERRMESSAGE = "image download fail"
62                logger.printLog(CONSTANT.ENVERRMESSAGE)
63                return False
64        except Exception as e:
65            raise e
66
67
68        # 执行升级
69        try:
70            if not self.upgrade():
71                CONSTANT.ENVERRMESSAGE = "board upgrade fail"
72                logger.printLog(CONSTANT.ENVERRMESSAGE)
73                return False
74            return True
75        except Exception as e:
76            raise e
77
78    @dec_stepmsg("download")
79    @timeout(1800)
80    def download(self):
81        '''
82        #===================================================================================
83        #   @Method:        download(self)
84        #   @Precondition:  none
85        #   @Func:          构建下载到本地的路径,执行相应包的下载
86        #   @PostStatus:    none
87        #   @eg:            download()
88        #   @return:        True or Flase
89        #===================================================================================
90        '''
91        global version_savepath, version_name
92        dir_path = CONSTANT.Path.getDirPath()
93        if self.params_dict.get("pbiid"):
94            version_path = self.params_dict.get("pbiid")
95            version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT"))
96            version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name)
97        else:
98            version_path = self.params_dict.get("upgrade_upgradeLocation")
99            version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation"))))
100            version_savepath = os.path.join(dir_path, version_name)
101
102        if self.params_dict.get("isDownload") == "True":
103            logger.printLog("不需要做下载,直接返回")
104            return True
105
106        #执行img下载
107        import hashlib
108        save_file_str = version_path.replace("/", "").replace("\\", "")
109        save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest()
110        logger.info("download hash value:%s" % (save_file_name))
111        save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt"))
112        if not self.excutedown(version_path, os.path.join(version_savepath, "img"), save_path_file, False):
113            logger.error("download img fail")
114            return False
115
116        #保存本地版本路径给devicetest去版本路径下取用例
117        saveVersion(save_path_file, os.path.join(version_savepath, "img"))
118
119        return True
120
121    def excutedown(self, source_path, download_dir, suc_mark, is_file):
122        '''
123        #===================================================================================
124        #   @Method:        excutedown(source_path, download_dir, is_file)
125        #   @Precondition:  none
126        #   @Func:          执行下载动作
127        #   @PostStatus:    none
128        #   @Param:         source_path:资源文件路径
129        #                   download_dir:文件下载到本地的文件夹路径
130        #                   is_file:是否是文件
131        #
132        #   @eg:            excutedown("xxxx", "D:\\local\\image", Flase, os_method)
133        #   @return:        True or Flase
134        #===================================================================================
135        '''
136        failed_mark = os.path.join(download_dir, failed_file)
137        lock_path = os.path.join(download_dir, lock_suffix)
138        file_lock = FileLock()
139
140        if isDownLoadSuccess(download_dir, suc_mark, failed_mark):
141            return True
142        try:
143            nowtime = get_now_time_str_info()
144            logger.printLog("%s Downloading, please wait" % nowtime)
145            file_lock.lockFile(lock_path)
146            ret = ""
147            logger.info("Get lock. Start to ")
148            if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True":
149                ret = downloadByBitComet(source_path, download_dir, os_method)
150            elif source_path.startswith('\\\\'):
151                ret = downloadByCopy(source_path, download_dir, is_file)
152            elif self.params_dict.get("pbiid"):
153                ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", self.params_dict.get("pbiid"))
154            elif source_path.startswith("http"):
155                ret = downloadFileFromDevCloud(source_path, "", "", download_dir)
156
157            if source_path.endswith(".zip"):
158                zip_name = os.path.basename(source_path)
159                ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir)
160            if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)):
161                if source_path.startswith("http") and ("file_id=" in source_path):
162                    if source_path.endswith(".tar.gz"):
163                        zip_name = source_path.split('=')[-1]
164                    else:
165                        zip_name = "out.tar.gz"
166                else:
167                    zip_name = os.path.basename(source_path)
168                ret = unTarFile(os.path.join(download_dir, zip_name), download_dir)
169            nowtime = get_now_time_str_info()
170            logger.printLog("%s download to %s end" % (nowtime, download_dir))
171
172            if not ret:
173                with open(failed_mark, "a+") as fp:
174                    fp.write("")
175            return ret
176        except Exception as e:
177            logger.printLog(e)
178            raise Exception(e)
179        finally:
180            file_lock.releaseFile()
181
182
183    @dec_stepmsg("upgrade")
184    #@timeout(900)
185    def upgrade(self):
186        '''
187        #===================================================================================
188        #   @Method:        upgrade(self)
189        #   @Precondition:  none
190        #   @Func:          升级相关业务逻辑
191        #   @PostStatus:    none
192        #   @eg:            upgrade()
193        #   @return:        True or Flase
194        #===================================================================================
195        '''
196        logger.printLog('开始升级')
197        deploy_com = self.params_dict.get("deploy_com")
198        usb_port = self.params_dict.get("usb_port")
199        baudrate = self.params_dict.get("baudrate")
200        #芯片类型,根据芯片类型获取对应的刷机命令
201        flash_type = self.params_dict.get("flash_type")
202        burn_usbport = self.params_dict.get("hiburn_usbport")
203        device_ip = self.params_dict.get("Device_IP")
204        device_netmask = self.params_dict.get("Device_Netmask")
205        device_gatewayip = self.params_dict.get("Device_GatewayIP")
206        chip_version = self.params_dict.get('chip_version')
207        chip_version = chip_version.lower() if chip_version else chip_version
208
209        if not deploy_com:
210            logger.error("deploy_com is NULL !!")
211            return False
212        if not burn_usbport:
213            logger.error("hiburn_usbport is NULL !!")
214            return False
215        if not baudrate:
216            baudrate = 115200
217        scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))))
218        #升级需要的工具归档
219        toolworkspace = CONSTANT.OSType.getworkspace()
220        hiburntoolpath = os.path.join(toolworkspace, "HiBurnCmdLine", "usb%s_tool" % burn_usbport)
221        logger.info("hiburn tool path is: %s" % hiburntoolpath)
222        if not os.path.exists(hiburntoolpath):
223            if not burn_usbport:
224                logger.error("hiburn_usbport is NULL !!")
225                return False
226            os.makedirs(hiburntoolpath)
227            toolpath = os.path.join(scriptpath, "resource", "HiBurnCmdLine.zip")
228            logger.info("copy %s to %s" % (toolpath, hiburntoolpath))
229            copyFile(toolpath, hiburntoolpath)
230            zip_name = os.path.basename(toolpath)
231            ret = extractZipFile(os.path.join(hiburntoolpath, zip_name), hiburntoolpath)
232            if ret:
233                logger.info("unzip to %s succ" % (hiburntoolpath))
234                #修改burn.config中的usb口
235                configpath = os.path.join(hiburntoolpath, "config", "burn.config")
236                all_data = ""
237                with open(configpath, "r", encoding="utf-8") as cf:
238                    for line in cf:
239                        if "usbDeviceNumber=" in line:
240                            old_str = line
241                            line = line.replace(old_str, "usbDeviceNumber=%s\r\n" % burn_usbport)
242                            logger.info("replace line: %s " % line)
243                        all_data += line
244                with open(configpath, "w", encoding="utf-8") as wf:
245                    wf.write(all_data)
246            else:
247                logger.error("%s is not exit" % hiburntoolpath)
248                return False
249        #将升级需要的文件拷贝到镜像里面
250        local_image_path = os.path.join(version_savepath, "img")
251        old_xml_path = os.path.join(scriptpath, "resource", "L1", flash_type, "usb-burn.xml")
252        xml_path = os.path.join(local_image_path, "usb-burn.xml")
253        copyFile(old_xml_path, xml_path)
254        if flash_type.lower() == "ev300":
255            chip_type = "Hi3518EV300"
256            ubootpath = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "u-boot-hi3518ev300.bin")
257        elif flash_type.lower() == "dv300_linux":
258            chip_type = "Hi3516DV300"
259            ubootpath = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "u-boot-hi3516dv300.bin")
260        else:
261            logger.error("flash_type is : %s " % flash_type)
262            return False
263        copyFile(ubootpath, local_image_path)
264        scriptfile = os.path.join(scriptpath, "resource", "L1", f'{flash_type.lower()}', "update.txt")
265        logger.info(f'scriptfile:{scriptfile}')
266
267        current_path = os.getcwd()
268        logger.info("before excute hiburn,current path is: %s" % current_path)
269        os.chdir(hiburntoolpath)
270        logger.info("excute hiburn path is: %s" % os.getcwd())
271        #擦除fastboot
272        logger.printLog('erase fastboot')
273        flash_uboot_xml = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "flash_fastboot.xml")
274        cmd = ".\jre\\bin\java -jar hiburn.jar --erase -n %s -m serial %s -x %s" % (chip_type, deploy_com.upper(), flash_uboot_xml)
275        self.eraseDevice(cmd, usb_port)
276
277        retry = 0
278        while retry < 3:
279            #usb刷机
280            cmd = ".\jre\\bin\java -jar hiburn.jar --burn -n %s -m USBBootrom -x %s" % (chip_type, xml_path)
281            logger.info("cmd is: %s" % cmd)
282            ret, outpri = subprocess.getstatusoutput(cmd)
283            logger.info("usb upgrade result: %s " % ret)
284            logger.info("print console: %s " % outpri)
285            if ret != 0:
286                if ret == 4 and retry < 2:
287                    time.sleep(10)
288                    retry = retry + 1
289                    logger.info('flash fail,so flash once again')
290                    continue
291                logger.info(ret)
292                logger.error("hiburn usb upgrade failed!!")
293                return False
294            retry = retry + 3
295        os.chdir(current_path)
296        logger.info("hiburn upgrade end, check board status")
297        time.sleep(10)
298        try:
299            logger.info("打开serial")
300            ser = serial.Serial(deploy_com, int(baudrate), timeout=0)
301            if self.bootUpUboot(ser,scriptfile) and self.configureIp(ser, flash_type, device_ip, device_netmask, device_gatewayip) and self.checkStatus(ser):
302                return True
303            return False
304        except Exception as e:
305            logger.info(e)
306            return False
307        finally:
308            ser.close()
309            logger.info("close serial")
310
311    def bootUpUboot(self,ser,scriptfile):
312        '''
313        启动uboot
314        '''
315        reset_count = 0
316        while reset_count < 2:
317            if ser.is_open == False:
318                ser.open()
319            logger.info("get device status")
320            board_type = getBoardType(ser)
321            if board_type == "uboot":
322                with open(scriptfile, "r") as fp:
323                    lines = fp.readlines()
324                for line in lines:
325                    if not line:
326                        logger.info("cmd is: %s " % line)
327                        continue
328                    if "reset" in line:
329                        ret = sendCmd(ser, line, READ_MAXTIMEOUT)
330                        continue
331                    ret = sendCmd(ser, line, READ_MINITIMEOUT)
332                board_type = getBoardType(ser)
333                if board_type != "OHOS":
334                    if  reset_count < 2:
335                        logger.info('after reset;the device status is error,reset device again,reset_count:%d' % reset_count)
336                        reset_count += 1
337                        time.sleep(20)
338                        continue
339                    logger.error("upgrade fail")
340                    return False
341                return True
342            else:
343                logger.info('before reset;the device status is error,reset device again,reset_count:%d'%reset_count)
344                reset_count += 1
345                time.sleep(20)
346                continue
347        else:
348            return False
349
350    def configureIp(self,ser,flash_type, device_ip, device_netmask, device_gatewayip):
351        '''
352        配置ip,确认配置情况
353        '''
354        rerty_count = 0
355        # test_count = 0
356        while rerty_count <= 2:
357            if flash_type.lower() == "dv300_linux":
358                if not self.afterRebootDvConfigure(ser, device_ip, device_netmask, device_gatewayip):
359                    return False
360            elif flash_type.lower() == "ev300":
361                if not self.afterRebootEvConfigure(ser):
362                    return False
363            time.sleep(5)
364
365            ret = sendCmd(ser, ip_cmd, READ_MINITIMEOUT)
366            logger.info(ret)
367            # if test_count ==0 :
368            #     logger.printLog('ip 配置失败')
369            #     test_count = 1
370            #     continue
371            if 'Reply from 192.168.18.1' in ret:
372                logger.info('ip 配置成功')
373                return True
374            elif 'Ping: sending ICMP echo request failed' in ret:
375                logger.printLog('ip 配置失败')
376                logger.info('重新配置ip')
377                rerty_count += 1
378            else:
379                return True
380
381    def afterRebootDvConfigure(self,ser, device_ip, device_netmask, device_gatewayip):
382        logger.info("dv300_linux configure ip")
383        init_cmd = "ifconfig eth0 %s netmask %s gateway %s \r" % (device_ip, device_netmask, device_gatewayip)
384        sendCmd(ser, init_cmd, READ_MINITIMEOUT)
385        sendCmd(ser, 'ifconfig\r', READ_MINITIMEOUT)
386        return True
387
388    def afterRebootEvConfigure(self,ser):
389        logger.info("ev300 configuring ,setup wifi")
390        cmd = 'ls\r'
391        ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
392        if not "sdcard" in ret.lower():
393            cmd = 'mkdir /sdcard\r'
394            ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
395            if "error:" in ret.lower():
396                logger.error("mkdir /sdcard fail")
397                return False
398        cmd = 'mount /dev/mmcblk0p0 /sdcard vfat\r'
399        ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
400        cmd = 'ls /sdcard\r'
401        ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
402        cmd = 'cd /sdcard/wpa\r'
403        ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
404        cmd = 'exec wpa_supplicant -i wlan0 -c wpa_supplicant.conf \r'
405        ret = sendCmd(ser, cmd, READ_MAXTIMEOUT)
406        if "error:" in ret.lower():
407            logger.error("setup wifi fail")
408            return False
409        cmd = 'ifconfig\r'
410        ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
411        if "error:" in ret.lower():
412            logger.error("ifconfig fail")
413            return False
414        return True
415
416    def eraseDevice(self, cmd, usb_port):
417        '''
418        ret 暂时没有作用,先使用out
419        '''
420        erase_retry = 0
421        while erase_retry < 3:
422            # 通电
423            PowerOnByThread(usb_port)
424            logger.info("cmd is: %s" % cmd)
425            ret, outpri = subprocess.getstatusoutput(cmd)
426            logger.info("flash fastboot result: %s " % ret)
427            logger.info("print console: %s " % outpri)
428            is_earse_again = any([True if item in outpri else False for item in error_str_list])
429            if ret == 0 and erase_retry < 2 and not is_earse_again :
430                logger.info('檫除成功'.center(20,'*'))
431                break
432            elif is_earse_again:
433                logger.info('檫除存在问题 重新上下电 重新檫除')
434                erase_retry += 1
435                time.sleep(5)
436                continue
437            else:
438                logger.info('other error')
439                return False
440        else:
441            return False
442
443    def checkStatus(self, ser):
444        times = 0
445        while times < 3:
446            ret1 = sendCmd(ser, 'ps -elf', READ_TIMEOUT)
447            list_ret = re.findall('appspawn', ret1)
448            logger.info(list_ret)
449            number = list_ret.count('appspawn')
450            logger.info(number)
451            if number >= 2:
452                return True
453            else:
454                times += 1
455                time.sleep(3)
456        logger.info("No two appspawn processes, please check the device!")
457        return False
458
459def PowerOnByThread(usb_port,wait_time=10):
460    thread = Thread(target=boardPowerOn, args=[usb_port, wait_time])
461    thread.start()
462    logger.info("thread board power on start")
463
464
465def boardPowerOn(usb_port, waittime):
466    logger.info("board power on start")
467    time.sleep(waittime)
468
469    #对端口下电
470    if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "off"):
471        logger.error("board power off failed")
472        return False
473
474    #对端口上电
475    if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "on"):
476        logger.error("board power on failed")
477        return False
478    logger.info("board power on end")
479
480
481def getBoardType(ser):
482    ret = sendCmd(ser, '\r', READ_TIMEOUT)
483    # if 'HMOS' in ret or 'OHOS' in ret or '#' in ret:
484    #     ostype = 'OHOS'
485    # elif 'hisilicon' in ret:
486    #     ostype = 'uboot'
487    if 'hisilicon' in ret:
488        ostype = 'uboot'
489    elif 'HMOS' in ret or 'OHOS' in ret or '#' in ret:
490        ostype = 'OHOS'
491    elif ' #' in ret:
492        ostype = 'linux'
493    else:
494        ostype = 'bootrom'
495    logger.info("board type is: %s" % ostype)
496    return ostype
497
498def sendCmd(ser, cmd, timeout):
499    logger.info("cmd is: %s " % cmd)
500    ser.write((cmd + '\n').encode())
501    time.sleep(5)
502    ret = ''
503    i = 0
504    while True:
505        out = ser.read(ser.inWaiting())
506        if not out:
507            break
508        if i > 12:
509            break
510        ret = ret + out.decode(encoding="utf-8", errors="ignore")
511        time.sleep(timeout)
512        i = i + 1
513    logger.info("result is: %s " % ret)
514    return ret
515
516
517
518if __name__ == "__main__":
519    param_file = sys.argv[1]
520    if not param_file:
521        logger.printLog("Missing params file")
522        sys.exit(-1)
523    try:
524        uphandle = liteOsUpgrade_linux(param_file)
525        uphandle._excuteApp()
526    except Exception as e:
527        logger.printLog(e)
528        sys.exit(-1)
529
530