• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#-*- coding:utf-8 -*-
2import uuid
3import sys
4import subprocess
5import os
6import serial
7import datetime
8
9from core.base import BaseApp, dec_stepmsg
10from util.file_locker import FileLock
11from util.log_info import logger
12from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout
13from aw.Telnet.TelnetClient import TelConnect
14from aw.Common.Constant import CONSTANT
15from aw.Download.Download import *
16from aw.Common.Common import getHostIp, copyFile, copyDirectory
17from aw.ExtractFile.ExtractFile import *
18from aw.poweronoff.serial_power_on_off import usbPowerOnOff, usbPowerOnOffV2
19from threading import Thread
20from subprocess import getstatusoutput
21
22lock_suffix = CONSTANT.File.LOCK_SUFFIX  #通过文件锁实现并发下载
23suc_file = CONSTANT.File.SUC_FILE        #通过本文件是区分版本是否成功下载
24failed_file = CONSTANT.File.FAILED_FILE  #通过本文件是标记文件下载失败
25READ_MAXTIMEOUT = 5
26READ_TIMEOUT = 5
27READ_MINITIMEOUT = 2
28uboot_finish = 'hisilicon #'
29cmd_finish = ' #'
30error_str_list = ['Unknown', '发送起始帧失败', '发送头帧失败']
31
32_is_download_success = False
33
34
35class liteOsUpgrade_L2(BaseApp):
36
37    def __init__(self, param_file):
38        super().__init__(param_file)
39        self.param_List = ["deploy_com",
40                        "usb_port",
41                           "upgrade_upgradeLocation"]
42
43    @dec_stepmsg("hongmeng L2 flash")
44    def excute(self):
45        '''
46        #===================================================================================
47        #   @Method:        excute(self)
48        #   @Precondition:  none
49        #   @Func:          升级执行入口
50        #   @PostStatus:    none
51        #   @eg:            excute()
52        #   @return:        True or Flase
53        #===================================================================================
54        '''
55        step_index = self.params_dict.get("step_list").index("liteOsUpgrade_L2_app")
56
57        # 执行下载
58        try:
59            if not self.download():
60                CONSTANT.ENVERRMESSAGE = "image download fail"
61                logger.printLog(CONSTANT.ENVERRMESSAGE)
62                return False
63        except Exception as e:
64            raise e
65
66
67        # 执行升级
68        try:
69            if not self.upgrade():
70                CONSTANT.ENVERRMESSAGE = "board upgrade fail"
71                logger.printLog(CONSTANT.ENVERRMESSAGE)
72                return False
73            return True
74        except Exception as e:
75            raise e
76
77    @dec_stepmsg("download")
78    @timeout(18000)
79    def download(self):
80        '''
81        #===================================================================================
82        #   @Method:        download(self)
83        #   @Precondition:  none
84        #   @Func:          构建下载到本地的路径,执行相应包的下载
85        #   @PostStatus:    none
86        #   @eg:            download()
87        #   @return:        True or Flase
88        #===================================================================================
89        '''
90        global version_savepath, version_name, _is_download_success
91        dir_path = CONSTANT.Path.getDirPath()
92        if self.params_dict.get("pbiid"):
93            version_path = self.params_dict.get("pbiid")
94            version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT"))
95            version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name)
96        else:
97            version_path = self.params_dict.get("upgrade_upgradeLocation")
98            version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation"))))
99            version_savepath = os.path.join(dir_path, version_name)
100
101        if self.params_dict.get("isDownload") == "True":
102            logger.printLog("不需要做下载,直接返回")
103            return True
104
105        #执行img下载
106        import hashlib
107        save_file_str = version_path.replace("/", "").replace("\\", "")
108        save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest()
109        logger.info("download hash value:%s" % (save_file_name))
110        save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt"))
111        logger.printLog('save_path_file: %s' % save_path_file)
112        if not self.excutedown(version_path, os.path.join(version_savepath, "img"), save_path_file, False):
113            logger.error("download img fail")
114            return False
115        _is_download_success = True
116        #保存本地版本路径给devicetest去版本路径下取用例
117        saveVersion(save_path_file, os.path.join(version_savepath, "img"))
118
119        return True
120
121    def excutedown(self, source_path, download_dir, suc_mark, is_file):
122        '''
123        #===================================================================================
124        #   @Method:        excutedown(source_path, download_dir, is_file)
125        #   @Precondition:  none
126        #   @Func:          执行下载动作
127        #   @PostStatus:    none
128        #   @Param:         source_path:资源文件路径
129        #                   download_dir:文件下载到本地的文件夹路径
130        #                   is_file:是否是文件
131        #
132        #   @eg:            excutedown("xxxx", "D:\\local\\image", Flase, os_method)
133        #   @return:        True or Flase
134        #===================================================================================
135        '''
136        failed_mark = os.path.join(download_dir, failed_file)
137        lock_path = os.path.join(download_dir, lock_suffix)
138        file_lock = FileLock()
139
140        if isDownLoadSuccess(download_dir, suc_mark, failed_mark):
141            return True
142        try:
143            nowtime = get_now_time_str_info()
144            logger.printLog("%s Downloading, please wait" % nowtime)
145            file_lock.lockFile(lock_path)
146            ret = ""
147            logger.info("Get lock. Start to ")
148            if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True":
149                ret = downloadByBitComet(source_path, download_dir, os_method)
150            elif source_path.startswith('\\\\'):
151                ret = downloadByCopy(source_path, download_dir, is_file)
152            elif self.params_dict.get("pbiid"):
153                ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", self.params_dict.get("pbiid"))
154            elif source_path.startswith("http"):
155                ret = downloadFileFromDevCloud(source_path, "", "", download_dir)
156
157            if source_path.endswith(".zip"):
158                zip_name = os.path.basename(source_path)
159                ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir)
160            if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)):
161                if source_path.startswith("http") and ("file_id=" in source_path):
162                    if source_path.endswith(".tar.gz"):
163                        zip_name = source_path.split('=')[-1]
164                    else:
165                        zip_name = "out.tar.gz"
166                else:
167                    zip_name = os.path.basename(source_path)
168                logger.printLog(f'tar_file:{os.path.join(download_dir, zip_name)},dest_file:{download_dir}')
169                ret = unTarFile(os.path.join(download_dir, zip_name), download_dir)
170            nowtime = get_now_time_str_info()
171            logger.printLog("%s download to %s end" % (nowtime, download_dir))
172
173            if not ret:
174                with open(failed_mark, "a+") as fp:
175                    fp.write("")
176            return ret
177        except Exception as e:
178            logger.printLog(e)
179            raise Exception(e)
180        finally:
181            file_lock.releaseFile()
182
183
184    @dec_stepmsg("upgrade")
185    #@timeout(900)
186    def upgrade(self):
187        '''
188        #===================================================================================
189        #   @Method:        upgrade(self)
190        #   @Precondition:  none
191        #   @Func:          升级相关业务逻辑
192        #   @PostStatus:    none
193        #   @eg:            upgrade()
194        #   @return:        True or Flase
195        #===================================================================================
196        '''
197        global _is_download_success
198        logger.printLog('开始升级')
199        deploy_com = self.params_dict.get("deploy_com")
200        usb_port = self.params_dict.get("usb_port")
201        baudrate = self.params_dict.get("baudrate")
202        #芯片类型,根据芯片类型获取对应的刷机命令
203        flash_type = self.params_dict.get("flash_type")
204        burn_usbport = self.params_dict.get("hiburn_usbport")
205        device_ip = self.params_dict.get("Device_IP")
206        device_netmask = self.params_dict.get("Device_Netmask")
207        device_gatewayip = self.params_dict.get("Device_GatewayIP")
208        chip_version = self.params_dict.get('chip_version')
209        chip_version = chip_version.lower() if chip_version else chip_version
210        sn = self.params_dict.get('sn')
211
212        if not deploy_com:
213            logger.error("deploy_com is NULL !!")
214            return False
215        if not burn_usbport:
216            logger.error("hiburn_usbport is NULL !!")
217            return False
218        if not baudrate:
219            baudrate = 115200
220        #执行下载
221        t_download = Thread(target=self.download, args=())
222        t_download.start()
223        scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))))
224        #升级需要的工具归档
225        toolworkspace = CONSTANT.OSType.getworkspace()
226        hiburntoolpath = os.path.join(toolworkspace, "HiBurnCmdLine", "usb%s_tool" % burn_usbport)
227        logger.info("hiburn tool path is: %s" % hiburntoolpath)
228        if not os.path.exists(hiburntoolpath):
229            if not burn_usbport:
230                logger.error("hiburn_usbport is NULL !!")
231                return False
232            os.makedirs(hiburntoolpath)
233            toolpath = os.path.join(scriptpath, "resource", "HiBurnCmdLine.zip")
234            logger.info("copy %s to %s" % (toolpath, hiburntoolpath))
235            copyFile(toolpath, hiburntoolpath)
236            zip_name = os.path.basename(toolpath)
237            ret = extractZipFile(os.path.join(hiburntoolpath, zip_name), hiburntoolpath)
238            if ret:
239                logger.info("unzip to %s succ" % (hiburntoolpath))
240                #修改burn.config中的usb口
241                configpath = os.path.join(hiburntoolpath, "config", "burn.config")
242                all_data = ""
243                with open(configpath, "r", encoding="utf-8") as cf:
244                    for line in cf:
245                        if "usbDeviceNumber=" in line:
246                            old_str = line
247                            line = line.replace(old_str, "usbDeviceNumber=%s\r\n" % burn_usbport)
248                            logger.info("replace line: %s " % line)
249                        all_data += line
250                with open(configpath, "w", encoding="utf-8") as wf:
251                    wf.write(all_data)
252            else:
253                logger.error("%s is not exit" % hiburntoolpath)
254                return False
255        os.chdir(hiburntoolpath)
256        if flash_type.lower() == "ev300":
257            chip_type = "Hi3518EV300"
258        elif flash_type.lower() == "dv300":
259            chip_type = "Hi3516DV300"
260        else:
261            logger.error("flash_type is : %s " % flash_type)
262            return False
263        #擦除fastboot
264        logger.printLog('erase fastboot')
265        flash_uboot_xml = os.path.join(scriptpath, "resource", "L2", flash_type.lower(), "flash_fastboot.xml")
266        cmd = ".\jre\\bin\java -jar hiburn.jar --erase -n %s -m serial %s -x %s" % (chip_type, deploy_com.upper(), flash_uboot_xml)
267        self.eraseDevice(cmd, usb_port)
268        t_download.join()
269        # 检查下载是否成功
270        if not _is_download_success:
271            return False
272
273        #将升级需要的文件拷贝到镜像里面
274        local_image_path = os.path.join(version_savepath, "img")
275        old_xml_path = os.path.join(scriptpath, "resource", "L2", flash_type, "Hi3516DV300-emmc.xml")
276        xml_path = os.path.join(local_image_path, "Hi3516DV300-emmc.xml")
277        copyFile(old_xml_path, xml_path)
278        # copyFile(ubootpath, local_image_path)
279        scriptfile = os.path.join(scriptpath, "resource", "L2", f'{flash_type.lower()}', "update.txt")
280        logger.info(f'scriptfile:{scriptfile}')
281
282        logger.printLog('进行烧写')
283        retry = 0
284        while retry < 3:
285            #usb刷机
286            cmd = ".\jre\\bin\java -jar hiburn.jar --burn -n %s -m USBBootrom -x %s" % (chip_type, xml_path)
287            logger.info("cmd is: %s" % cmd)
288            ret, outpri = subprocess.getstatusoutput(cmd)
289            logger.info("usb upgrade result: %s " % ret)
290            logger.info("print console: %s " % outpri)
291            if ret != 0:
292                if ret == 4 and retry < 2:
293                    time.sleep(10)
294                    retry = retry + 1
295                    logger.info('flash fail,so flash once again')
296                    continue
297                logger.info(ret)
298                logger.error("hiburn usb upgrade failed!!")
299                return False
300            retry = retry + 3
301        os.chdir(current_path)
302        logger.info("hiburn upgrade end, check board status")
303        logger.info("为保持hdc稳定性,等待60秒")
304        time.sleep(60)
305        try:
306            #pass
307            self.hdc_set_time(sn)
308        except Exception as e:
309            logger.printLog('hdc设置时间超时')
310            logger.printLog(str(e))
311        try:
312            logger.info("打开serial")
313            ser = serial.Serial(deploy_com, int(baudrate), timeout=0)
314            if ser.is_open == False:
315                ser.open()
316            logger.info("get device status")
317            board_type = getBoardType(ser)
318            if board_type == "OHOS" and self.checkStatus(ser):
319                return True
320
321                # ret = sendCmd(ser, 'sa;reset;', READ_MAXTIMEOUT)
322                # board_type = getBoardType(ser)
323                # if board_type != "OHOS":
324                #     logger.error("upgrade fail")
325                #     return False
326            else:
327                return False
328        except Exception as e:
329            logger.info(str(e))
330            return False
331        finally:
332            ser.close()
333            logger.info("close serial")
334
335    def checkStatus(self, ser):
336        time.sleep(180)
337        # print('wait 120 seconds')
338        ren_cmd = 'ps -elf | grep render_service'
339        laun_cmd = 'ps -elf | grep com.ohos.launcher'
340        time.sleep(10)
341        ren_status_1 = getPsPid(ser, ren_cmd, 'render_service')
342        time.sleep(5)
343        ren_status_2 = getPsPid(ser, ren_cmd, 'render_service')
344        time.sleep(5)
345        laun_status_1 = getPsPid(ser, laun_cmd, 'com.ohos.launcher')
346        time.sleep(5)
347        laun_status_2 = getPsPid(ser, laun_cmd, 'com.ohos.launcher')
348        # wes_cmd = 'pidof weston'
349        # laun_cmd = 'pidof com.ohos.launcher'
350        # time.sleep(10)
351        # wes_status_1 = sendCmd(ser, wes_cmd, READ_TIMEOUT)
352        # time.sleep(5)
353        # wes_status_2 = sendCmd(ser, wes_cmd, READ_TIMEOUT)
354        # time.sleep(5)
355        # laun_status_1 = sendCmd(ser, laun_cmd, READ_TIMEOUT)
356        # time.sleep(5)
357        # laun_status_2 = sendCmd(ser, laun_cmd, READ_TIMEOUT)
358        logger.info('ren_status_1: %s ren_status_2: %s laun_1: %s laun_2: %s '%(ren_status_1,ren_status_2,laun_status_1,laun_status_2))
359        if ren_status_1 and ren_status_1 == ren_status_2 and laun_status_1 and laun_status_1 == laun_status_2:
360            return True
361        if not ren_status_1 or not ren_status_2 or ren_status_1 != ren_status_2:
362            # and not wes_status_1.strip().isdigit() or not wes_status_2.strip().isdigit()
363            logger.printLog('process render_service is not exist')
364        if not laun_status_1 or not laun_status_2 :
365            # or not laun_status_1.strip().isdigit() or not laun_status_2.strip().isdigit()
366            logger.printLog('process com.ohos.launcher is not exist')
367        return False
368
369    @timeout(30)
370    def hdc_set_time(self,sn):
371        # todo
372        y_m_d = time.strftime('%Y-%m-%d',time.localtime())
373        h_m_s = time.strftime('%H:%M:%S',time.localtime())
374        cmd = 'hdc_std -t %s shell date -u "%sT%s"'% (sn,y_m_d,h_m_s)
375        logger.printLog('hdc start set time')
376        logger.printLog('cmd is %s'% cmd)
377        ret,out = subprocess.getstatusoutput(cmd)
378        logger.printLog('hdc end set time')
379        if h_m_s not in out:
380            logger.printLog('hdc设置时间失败')
381            logger.printLog(out)
382            return False
383        logger.printLog(out)
384
385    def eraseDevice(self, cmd, usb_port):
386        '''
387        ret 暂时没有作用,先使用out
388        '''
389        erase_retry = 0
390        while erase_retry < 3:
391            # 通电
392            PowerOnByThread(usb_port)
393            logger.info("cmd is: %s" % cmd)
394            ret, outpri = subprocess.getstatusoutput(cmd)
395            logger.info("flash fastboot result: %s " % ret)
396            logger.info("print console: %s " % outpri)
397            is_earse_again = any([True if item in outpri else False for item in error_str_list])
398            if ret == 0 and erase_retry < 2 and not is_earse_again :
399                logger.info('檫除成功'.center(20,'*'))
400                break
401            elif is_earse_again:
402                logger.info('檫除存在问题 重新上下电 重新檫除')
403                erase_retry += 1
404                time.sleep(5)
405                continue
406            else:
407                logger.info('other error')
408                return False
409        else:
410            return False
411
412    def catchEraseFail(self,cmd):
413        logger.printLog(datetime.datetime.now())
414        ret, outpri = subprocess.getstatusoutput(cmd)
415        logger.printLog(datetime.datetime.now())
416        return ret,outpri
417
418def getPsPid(ser, cmd, ps):
419    try:
420        data = sendCmd(ser, cmd, READ_TIMEOUT)
421        logger.info(data)
422        data = [item for item in data.split('\n') if 'grep' not in item and ps in item ]
423        if len(data) < 1:
424            return
425        elif len(data) > 1:
426            data = max(data, key=len, default='')
427        logger.info(data)
428        data = data[0].split()
429        if len(data) < 1:
430            return
431        else:
432            logger.info(data)
433            return data[1]
434    except Exception as e :
435        logger.info(str(e))
436        logger.info('pid 获取失败')
437
438def PowerOnByThread(usb_port,wait_time=10):
439    thread = Thread(target=boardPowerOn, args=[usb_port, wait_time])
440    thread.start()
441    logger.info("thread board power on start")
442
443
444
445def boardPowerOn(usb_port, waittime):
446    logger.info("board power on start")
447    time.sleep(waittime)
448
449    #对端口下电
450    if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "off"):
451        logger.error("board power off failed")
452        return False
453
454    #对端口上电
455    if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "on"):
456        logger.error("board power on failed")
457        return False
458    logger.info("board power on end")
459
460
461def getBoardType(ser):
462    ret = sendCmd(ser, '\r', READ_TIMEOUT)
463    if 'HMOS' in ret or 'OHOS' in ret or 'console:/ $' in ret or '#' in ret:
464        ostype = 'OHOS'
465    elif 'hisilicon' in ret:
466        ostype = 'uboot'
467    elif ' #' in ret:
468        ostype = 'linux'
469    else:
470        ostype = 'bootrom'
471    logger.info("board type is: %s" % ostype)
472    return ostype
473
474def sendCmd(ser, cmd, timeout):
475    logger.info("cmd is: %s " % cmd)
476    ser.write((cmd + '\n').encode())
477    time.sleep(0.5)
478    ret = ''
479    i = 0
480    while True:
481        out = ser.read(ser.inWaiting())
482        if not out:
483            break
484        if i > 12:
485            break
486        ret = ret + out.decode(encoding="utf-8", errors="ignore")
487        time.sleep(timeout)
488        i = i + 1
489    logger.info("result is: %s " % ret)
490    return ret
491
492def sendHdcCmd(cmd):
493    from subprocess import getstatusoutput
494    i = 0
495    while True:
496        if i > 3:
497            break
498        try:
499            out = executeHdcCmd(cmd)
500            logger.info('hdc cmd success')
501            return out
502        except Exception as e:
503            logger.info(str(e))
504            logger.info('hdc cmd fail')
505            i += 1
506            time.sleep(1)
507            continue
508    logger.error('hdc cmd execute fail by three times')
509    return ''
510
511@timeout(10)
512def executeHdcCmd(cmd):
513    logger.info('cmd is %s' % cmd)
514    time.sleep(0.5)
515    ret, out = getstatusoutput(cmd)
516    time.sleep(0.5)
517    if ret != 0:
518        logger.info('execute fail')
519        return 'execute fail'
520    logger.info('result is %s'% out)
521    return out
522
523if __name__ == '__main__':
524    ser = serial.Serial('com5', 115200, timeout=0)