• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#-*- coding:utf-8 -*-
2import uuid
3import sys
4import subprocess
5import os
6import serial
7
8from core.base import BaseApp, dec_stepmsg
9from util.file_locker import FileLock
10from util.log_info import logger
11from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout
12from aw.Telnet.TelnetClient import TelConnect
13from aw.Common.Constant import CONSTANT
14from aw.Download.Download import *
15from aw.Common.Common import getHostIp, copyFile, copyDirectory
16from aw.ExtractFile.ExtractFile import *
17from aw.poweronoff.serial_power_on_off import usbPowerOnOff
18from threading import Thread
19
20lock_suffix = CONSTANT.File.LOCK_SUFFIX  #通过文件锁实现并发下载
21suc_file = CONSTANT.File.SUC_FILE        #通过本文件是区分版本是否成功下载
22failed_file = CONSTANT.File.FAILED_FILE  #通过本文件是标记文件下载失败
23READ_MAXTIMEOUT = 20
24READ_TIMEOUT = 30
25READ_MINITIMEOUT = 5
26uboot_finish = 'hisilicon #'
27cmd_finish = ' #'
28
29class liteOsUpgrade_L2(BaseApp):
30
31    def __init__(self, param_file):
32        super().__init__(param_file)
33        self.param_List = ["deploy_com",
34                        "usb_port",
35                           "upgrade_upgradeLocation"]
36
37    @dec_stepmsg("hongmeng L2 flash")
38    def excute(self):
39        '''
40        #===================================================================================
41        #   @Method:        excute(self)
42        #   @Precondition:  none
43        #   @Func:          升级执行入口
44        #   @PostStatus:    none
45        #   @eg:            excute()
46        #   @return:        True or Flase
47        #===================================================================================
48        '''
49        step_index = self.params_dict.get("step_list").index("liteOsUpgrade_L2_app")
50
51        # 执行下载
52        try:
53            if not self.download():
54                CONSTANT.ENVERRMESSAGE = "image download fail"
55                logger.printLog(CONSTANT.ENVERRMESSAGE)
56                return False
57        except Exception as e:
58            raise e
59
60
61        # 执行升级
62        try:
63            if not self.upgrade():
64                CONSTANT.ENVERRMESSAGE = "board upgrade fail"
65                logger.printLog(CONSTANT.ENVERRMESSAGE)
66                return False
67            return True
68        except Exception as e:
69            raise e
70
71    @dec_stepmsg("download")
72    @timeout(18000)
73    def download(self):
74        '''
75        #===================================================================================
76        #   @Method:        download(self)
77        #   @Precondition:  none
78        #   @Func:          构建下载到本地的路径,执行相应包的下载
79        #   @PostStatus:    none
80        #   @eg:            download()
81        #   @return:        True or Flase
82        #===================================================================================
83        '''
84        global version_savepath, version_name
85        dir_path = CONSTANT.Path.getDirPath()
86        if self.params_dict.get("pbiid"):
87            version_path = self.params_dict.get("pbiid")
88            version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT"))
89            version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name)
90        else:
91            version_path = self.params_dict.get("upgrade_upgradeLocation")
92            version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation"))))
93            version_savepath = os.path.join(dir_path, version_name)
94
95        if self.params_dict.get("isDownload") == "True":
96            logger.printLog("不需要做下载,直接返回")
97            return True
98
99        #执行img下载
100        import hashlib
101        save_file_str = version_path.replace("/", "").replace("\\", "")
102        save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest()
103        logger.info("download hash value:%s" % (save_file_name))
104        save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt"))
105        logger.printLog('save_path_file: %s' % save_path_file)
106        if not self.excutedown(version_path, os.path.join(version_savepath, "img"), save_path_file, False):
107            logger.error("download img fail")
108            return False
109
110        #保存本地版本路径给devicetest去版本路径下取用例
111        saveVersion(save_path_file, os.path.join(version_savepath, "img"))
112
113        return True
114
115    def excutedown(self, source_path, download_dir, suc_mark, is_file):
116        '''
117        #===================================================================================
118        #   @Method:        excutedown(source_path, download_dir, is_file)
119        #   @Precondition:  none
120        #   @Func:          执行下载动作
121        #   @PostStatus:    none
122        #   @Param:         source_path:资源文件路径
123        #                   download_dir:文件下载到本地的文件夹路径
124        #                   is_file:是否是文件
125        #
126        #   @eg:            excutedown("xxxx", "D:\\local\\image", Flase, os_method)
127        #   @return:        True or Flase
128        #===================================================================================
129        '''
130        failed_mark = os.path.join(download_dir, failed_file)
131        lock_path = os.path.join(download_dir, lock_suffix)
132        file_lock = FileLock()
133
134        if isDownLoadSuccess(download_dir, suc_mark, failed_mark):
135            return True
136        try:
137            nowtime = get_now_time_str_info()
138            logger.printLog("%s Downloading, please wait" % nowtime)
139            file_lock.lockFile(lock_path)
140            ret = ""
141            logger.info("Get lock. Start to ")
142            if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True":
143                ret = downloadByBitComet(source_path, download_dir, os_method)
144            elif source_path.startswith('\\\\'):
145                ret = downloadByCopy(source_path, download_dir, is_file)
146            elif self.params_dict.get("pbiid"):
147                ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", self.params_dict.get("pbiid"))
148            elif source_path.startswith("http"):
149                ret = downloadFileFromDevCloud(source_path, "", "", download_dir)
150
151            if source_path.endswith(".zip"):
152                zip_name = os.path.basename(source_path)
153                ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir)
154            if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)):
155                if source_path.startswith("http") and ("file_id=" in source_path):
156                    if source_path.endswith(".tar.gz"):
157                        zip_name = source_path.split('=')[-1]
158                    else:
159                        zip_name = "out.tar.gz"
160                else:
161                    zip_name = os.path.basename(source_path)
162                logger.printLog(f'tar_file:{os.path.join(download_dir, zip_name)},dest_file:{download_dir}')
163                ret = unTarFile(os.path.join(download_dir, zip_name), download_dir)
164            nowtime = get_now_time_str_info()
165            logger.printLog("%s download to %s end" % (nowtime, download_dir))
166
167            if not ret:
168                with open(failed_mark, "a+") as fp:
169                    fp.write("")
170            return ret
171        except Exception as e:
172            logger.printLog(e)
173            raise Exception(e)
174        finally:
175            file_lock.releaseFile()
176
177
178    @dec_stepmsg("upgrade")
179    #@timeout(900)
180    def upgrade(self):
181        '''
182        #===================================================================================
183        #   @Method:        upgrade(self)
184        #   @Precondition:  none
185        #   @Func:          升级相关业务逻辑
186        #   @PostStatus:    none
187        #   @eg:            upgrade()
188        #   @return:        True or Flase
189        #===================================================================================
190        '''
191        logger.printLog('开始升级')
192        deploy_com = self.params_dict.get("deploy_com")
193        usb_port = self.params_dict.get("usb_port")
194        baudrate = self.params_dict.get("baudrate")
195        #芯片类型,根据芯片类型获取对应的刷机命令
196        flash_type = self.params_dict.get("flash_type")
197        burn_usbport = self.params_dict.get("hiburn_usbport")
198        device_ip = self.params_dict.get("Device_IP")
199        device_netmask = self.params_dict.get("Device_Netmask")
200        device_gatewayip = self.params_dict.get("Device_GatewayIP")
201        chip_version = self.params_dict.get('chip_version')
202        chip_version = chip_version.lower() if chip_version else chip_version
203
204        if not deploy_com:
205            logger.error("deploy_com is NULL !!")
206            return False
207        if not burn_usbport:
208            logger.error("hiburn_usbport is NULL !!")
209            return False
210        if not baudrate:
211            baudrate = 115200
212        scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))))
213        #升级需要的工具归档
214        toolworkspace = CONSTANT.OSType.getworkspace()
215        hiburntoolpath = os.path.join(toolworkspace, "HiBurnCmdLine", "usb%s_tool" % burn_usbport)
216        logger.info("hiburn tool path is: %s" % hiburntoolpath)
217        if not os.path.exists(hiburntoolpath):
218            if not burn_usbport:
219                logger.error("hiburn_usbport is NULL !!")
220                return False
221            os.makedirs(hiburntoolpath)
222            toolpath = os.path.join(scriptpath, "resource", "HiBurnCmdLine.zip")
223            logger.info("copy %s to %s" % (toolpath, hiburntoolpath))
224            copyFile(toolpath, hiburntoolpath)
225            zip_name = os.path.basename(toolpath)
226            ret = extractZipFile(os.path.join(hiburntoolpath, zip_name), hiburntoolpath)
227            if ret:
228                logger.info("unzip to %s succ" % (hiburntoolpath))
229                #修改burn.config中的usb口
230                configpath = os.path.join(hiburntoolpath, "config", "burn.config")
231                all_data = ""
232                with open(configpath, "r", encoding="utf-8") as cf:
233                    for line in cf:
234                        if "usbDeviceNumber=" in line:
235                            old_str = line
236                            line = line.replace(old_str, "usbDeviceNumber=%s\r\n" % burn_usbport)
237                            logger.info("replace line: %s " % line)
238                        all_data += line
239                with open(configpath, "w", encoding="utf-8") as wf:
240                    wf.write(all_data)
241            else:
242                logger.error("%s is not exit" % hiburntoolpath)
243                return False
244        #将升级需要的文件拷贝到镜像里面
245        local_image_path = os.path.join(version_savepath, "img")
246        old_xml_path = os.path.join(scriptpath, "resource", "L2", flash_type, "Hi3516DV300-emmc.xml")
247        xml_path = os.path.join(local_image_path, "Hi3516DV300-emmc.xml")
248        copyFile(old_xml_path, xml_path)
249        if flash_type.lower() == "ev300":
250            chip_type = "Hi3518EV300"
251        elif flash_type.lower() == "dv300":
252            chip_type = "Hi3516DV300"
253        else:
254            logger.error("flash_type is : %s " % flash_type)
255            return False
256        # copyFile(ubootpath, local_image_path)
257        scriptfile = os.path.join(scriptpath, "resource", "L2", f'{flash_type.lower()}', "update.txt")
258        logger.info(f'scriptfile:{scriptfile}')
259
260        current_path = os.getcwd()
261        logger.info("before excute hiburn,current path is: %s" % current_path)
262        os.chdir(hiburntoolpath)
263        logger.info("excute hiburn path is: %s" % os.getcwd())
264        #擦除fastboot
265        logger.printLog('erase fastboot')
266        flash_uboot_xml = os.path.join(scriptpath, "resource", "L2", flash_type.lower(), "flash_fastboot.xml")
267        erase_retry = 0
268        while erase_retry < 3:
269            # 通电
270            PowerOnByThread(usb_port)
271            cmd = ".\jre\\bin\java -jar hiburn.jar --erase -n %s -m serial %s -x %s" % (chip_type, deploy_com.upper(), flash_uboot_xml)
272            logger.info("cmd is: %s" % cmd)
273            ret, outpri = subprocess.getstatusoutput(cmd)
274            logger.info("flash fastboot result: %s " % ret)
275            logger.info("print console: %s " % outpri)
276            if ret == 0 and 'Unknown' not in outpri and erase_retry < 2:
277                break
278            elif 'Unknown' in outpri:
279                logger.info('串口问题 重新上下电 重新檫除')
280                erase_retry += 1
281                time.sleep(5)
282                continue
283            else:
284                logger.info('other error')
285                return False
286        else:
287            return False
288
289        retry = 0
290        while retry < 3:
291            #usb刷机
292            cmd = ".\jre\\bin\java -jar hiburn.jar --burn -n %s -m USBBootrom -x %s" % (chip_type, xml_path)
293            logger.info("cmd is: %s" % cmd)
294            ret, outpri = subprocess.getstatusoutput(cmd)
295            logger.info("usb upgrade result: %s " % ret)
296            logger.info("print console: %s " % outpri)
297            if ret != 0:
298                if ret == 4 and retry < 2:
299                    time.sleep(10)
300                    retry = retry + 1
301                    logger.info('flash fail,so flash once again')
302                    continue
303                logger.info(ret)
304                logger.error("hiburn usb upgrade failed!!")
305                return False
306            retry = retry + 3
307        os.chdir(current_path)
308        logger.info("hiburn upgrade end, check board status")
309        return True
310
311def PowerOnByThread(usb_port,wait_time=10):
312    thread = Thread(target=boardPowerOn, args=[usb_port, wait_time])
313    thread.start()
314    logger.info("thread board power on start")
315
316
317def boardPowerOn(usb_port, waittime):
318    logger.info("board power on start")
319    time.sleep(waittime)
320
321    #对端口下电
322    if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "off"):
323        logger.error("board power off failed")
324        return False
325
326    #对端口上电
327    if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "on"):
328        logger.error("board power on failed")
329        return False
330    logger.info("board power on end")
331
332
333def getBoardType(ser):
334    ret = sendCmd(ser, '\r', READ_TIMEOUT)
335    if 'HMOS' in ret or 'OHOS' in ret:
336        ostype = 'OHOS'
337    elif 'hisilicon' in ret:
338        ostype = 'uboot'
339    elif ' #' in ret:
340        ostype = 'linux'
341    else:
342        ostype = 'bootrom'
343    logger.info("board type is: %s" % ostype)
344    return ostype
345
346def sendCmd(ser, cmd, timeout):
347    logger.info("cmd is: %s " % cmd)
348    ser.write((cmd + '\n').encode())
349    time.sleep(0.5)
350    ret = ''
351    i = 0
352    while True:
353        out = ser.read(ser.inWaiting())
354        if not out:
355            break
356        if i > 2:
357            break
358        ret = ret + out.decode(encoding="utf-8", errors="ignore")
359        time.sleep(timeout)
360        i = i + 1
361    logger.info("result is: %s " % ret)
362    return ret
363
364
365