1#-*- coding:utf-8 -*- 2import uuid 3import sys 4import subprocess 5import os 6import serial 7import datetime 8 9from core.base import BaseApp, dec_stepmsg 10from util.file_locker import FileLock 11from util.log_info import logger 12from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout 13from aw.Telnet.TelnetClient import TelConnect 14from aw.Common.Constant import CONSTANT 15from aw.Download.Download import * 16from aw.Common.Common import getHostIp, copyFile, copyDirectory 17from aw.ExtractFile.ExtractFile import * 18from aw.poweronoff.serial_power_on_off import usbPowerOnOff, usbPowerOnOffV2 19from threading import Thread 20from subprocess import getstatusoutput 21 22lock_suffix = CONSTANT.File.LOCK_SUFFIX #通过文件锁实现并发下载 23suc_file = CONSTANT.File.SUC_FILE #通过本文件是区分版本是否成功下载 24failed_file = CONSTANT.File.FAILED_FILE #通过本文件是标记文件下载失败 25READ_MAXTIMEOUT = 5 26READ_TIMEOUT = 5 27READ_MINITIMEOUT = 2 28uboot_finish = 'hisilicon #' 29cmd_finish = ' #' 30error_str_list = ['Unknown', '发送起始帧失败', '发送头帧失败'] 31 32_is_download_success = False 33 34 35class liteOsUpgrade_L2(BaseApp): 36 37 def __init__(self, param_file): 38 super().__init__(param_file) 39 self.param_List = ["deploy_com", 40 "usb_port", 41 "upgrade_upgradeLocation"] 42 43 @dec_stepmsg("hongmeng L2 flash") 44 def excute(self): 45 ''' 46 #=================================================================================== 47 # @Method: excute(self) 48 # @Precondition: none 49 # @Func: 升级执行入口 50 # @PostStatus: none 51 # @eg: excute() 52 # @return: True or Flase 53 #=================================================================================== 54 ''' 55 step_index = self.params_dict.get("step_list").index("liteOsUpgrade_L2_app") 56 57 # 执行下载 58 try: 59 if not self.download(): 60 CONSTANT.ENVERRMESSAGE = "image download fail" 61 logger.printLog(CONSTANT.ENVERRMESSAGE) 62 return False 63 except Exception as e: 64 raise e 65 66 67 # 执行升级 68 try: 69 if not self.upgrade(): 70 CONSTANT.ENVERRMESSAGE = "board upgrade fail" 71 logger.printLog(CONSTANT.ENVERRMESSAGE) 72 return False 73 return True 74 except Exception as e: 75 raise e 76 77 @dec_stepmsg("download") 78 @timeout(18000) 79 def download(self): 80 ''' 81 #=================================================================================== 82 # @Method: download(self) 83 # @Precondition: none 84 # @Func: 构建下载到本地的路径,执行相应包的下载 85 # @PostStatus: none 86 # @eg: download() 87 # @return: True or Flase 88 #=================================================================================== 89 ''' 90 global version_savepath, version_name, _is_download_success 91 dir_path = CONSTANT.Path.getDirPath() 92 if self.params_dict.get("pbiid"): 93 version_path = self.params_dict.get("pbiid") 94 version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT")) 95 version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name) 96 else: 97 version_path = self.params_dict.get("upgrade_upgradeLocation") 98 version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation")))) 99 version_savepath = os.path.join(dir_path, version_name) 100 101 if self.params_dict.get("isDownload") == "True": 102 logger.printLog("不需要做下载,直接返回") 103 return True 104 105 #执行img下载 106 import hashlib 107 save_file_str = version_path.replace("/", "").replace("\\", "") 108 save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest() 109 logger.info("download hash value:%s" % (save_file_name)) 110 save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt")) 111 logger.printLog('save_path_file: %s' % save_path_file) 112 if not self.excutedown(version_path, os.path.join(version_savepath, "img"), save_path_file, False): 113 logger.error("download img fail") 114 return False 115 _is_download_success = True 116 #保存本地版本路径给devicetest去版本路径下取用例 117 saveVersion(save_path_file, os.path.join(version_savepath, "img")) 118 119 return True 120 121 def excutedown(self, source_path, download_dir, suc_mark, is_file): 122 ''' 123 #=================================================================================== 124 # @Method: excutedown(source_path, download_dir, is_file) 125 # @Precondition: none 126 # @Func: 执行下载动作 127 # @PostStatus: none 128 # @Param: source_path:资源文件路径 129 # download_dir:文件下载到本地的文件夹路径 130 # is_file:是否是文件 131 # 132 # @eg: excutedown("xxxx", "D:\\local\\image", Flase, os_method) 133 # @return: True or Flase 134 #=================================================================================== 135 ''' 136 failed_mark = os.path.join(download_dir, failed_file) 137 lock_path = os.path.join(download_dir, lock_suffix) 138 file_lock = FileLock() 139 140 if isDownLoadSuccess(download_dir, suc_mark, failed_mark): 141 return True 142 try: 143 nowtime = get_now_time_str_info() 144 logger.printLog("%s Downloading, please wait" % nowtime) 145 file_lock.lockFile(lock_path) 146 ret = "" 147 logger.info("Get lock. Start to ") 148 if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True": 149 ret = downloadByBitComet(source_path, download_dir, os_method) 150 elif source_path.startswith('\\\\'): 151 ret = downloadByCopy(source_path, download_dir, is_file) 152 elif self.params_dict.get("pbiid"): 153 ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", self.params_dict.get("pbiid")) 154 elif source_path.startswith("http"): 155 ret = downloadFileFromDevCloud(source_path, "", "", download_dir) 156 157 if source_path.endswith(".zip"): 158 zip_name = os.path.basename(source_path) 159 ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir) 160 if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)): 161 if source_path.startswith("http") and ("file_id=" in source_path): 162 if source_path.endswith(".tar.gz"): 163 zip_name = source_path.split('=')[-1] 164 else: 165 zip_name = "out.tar.gz" 166 else: 167 zip_name = os.path.basename(source_path) 168 logger.printLog(f'tar_file:{os.path.join(download_dir, zip_name)},dest_file:{download_dir}') 169 ret = unTarFile(os.path.join(download_dir, zip_name), download_dir) 170 nowtime = get_now_time_str_info() 171 logger.printLog("%s download to %s end" % (nowtime, download_dir)) 172 173 if not ret: 174 with open(failed_mark, "a+") as fp: 175 fp.write("") 176 return ret 177 except Exception as e: 178 logger.printLog(e) 179 raise Exception(e) 180 finally: 181 file_lock.releaseFile() 182 183 184 @dec_stepmsg("upgrade") 185 #@timeout(900) 186 def upgrade(self): 187 ''' 188 #=================================================================================== 189 # @Method: upgrade(self) 190 # @Precondition: none 191 # @Func: 升级相关业务逻辑 192 # @PostStatus: none 193 # @eg: upgrade() 194 # @return: True or Flase 195 #=================================================================================== 196 ''' 197 global _is_download_success 198 logger.printLog('开始升级') 199 deploy_com = self.params_dict.get("deploy_com") 200 usb_port = self.params_dict.get("usb_port") 201 baudrate = self.params_dict.get("baudrate") 202 #芯片类型,根据芯片类型获取对应的刷机命令 203 flash_type = self.params_dict.get("flash_type") 204 burn_usbport = self.params_dict.get("hiburn_usbport") 205 device_ip = self.params_dict.get("Device_IP") 206 device_netmask = self.params_dict.get("Device_Netmask") 207 device_gatewayip = self.params_dict.get("Device_GatewayIP") 208 chip_version = self.params_dict.get('chip_version') 209 chip_version = chip_version.lower() if chip_version else chip_version 210 sn = self.params_dict.get('sn') 211 212 if not deploy_com: 213 logger.error("deploy_com is NULL !!") 214 return False 215 if not burn_usbport: 216 logger.error("hiburn_usbport is NULL !!") 217 return False 218 if not baudrate: 219 baudrate = 115200 220 #执行下载 221 t_download = Thread(target=self.download, args=()) 222 t_download.start() 223 scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))) 224 #升级需要的工具归档 225 toolworkspace = CONSTANT.OSType.getworkspace() 226 hiburntoolpath = os.path.join(toolworkspace, "HiBurnCmdLine", "usb%s_tool" % burn_usbport) 227 logger.info("hiburn tool path is: %s" % hiburntoolpath) 228 if not os.path.exists(hiburntoolpath): 229 if not burn_usbport: 230 logger.error("hiburn_usbport is NULL !!") 231 return False 232 os.makedirs(hiburntoolpath) 233 toolpath = os.path.join(scriptpath, "resource", "HiBurnCmdLine.zip") 234 logger.info("copy %s to %s" % (toolpath, hiburntoolpath)) 235 copyFile(toolpath, hiburntoolpath) 236 zip_name = os.path.basename(toolpath) 237 ret = extractZipFile(os.path.join(hiburntoolpath, zip_name), hiburntoolpath) 238 if ret: 239 logger.info("unzip to %s succ" % (hiburntoolpath)) 240 #修改burn.config中的usb口 241 configpath = os.path.join(hiburntoolpath, "config", "burn.config") 242 all_data = "" 243 with open(configpath, "r", encoding="utf-8") as cf: 244 for line in cf: 245 if "usbDeviceNumber=" in line: 246 old_str = line 247 line = line.replace(old_str, "usbDeviceNumber=%s\r\n" % burn_usbport) 248 logger.info("replace line: %s " % line) 249 all_data += line 250 with open(configpath, "w", encoding="utf-8") as wf: 251 wf.write(all_data) 252 else: 253 logger.error("%s is not exit" % hiburntoolpath) 254 return False 255 os.chdir(hiburntoolpath) 256 if flash_type.lower() == "ev300": 257 chip_type = "Hi3518EV300" 258 elif flash_type.lower() == "dv300": 259 chip_type = "Hi3516DV300" 260 else: 261 logger.error("flash_type is : %s " % flash_type) 262 return False 263 #擦除fastboot 264 logger.printLog('erase fastboot') 265 flash_uboot_xml = os.path.join(scriptpath, "resource", "L2", flash_type.lower(), "flash_fastboot.xml") 266 cmd = ".\jre\\bin\java -jar hiburn.jar --erase -n %s -m serial %s -x %s" % (chip_type, deploy_com.upper(), flash_uboot_xml) 267 self.eraseDevice(cmd, usb_port) 268 t_download.join() 269 # 检查下载是否成功 270 if not _is_download_success: 271 return False 272 273 #将升级需要的文件拷贝到镜像里面 274 local_image_path = os.path.join(version_savepath, "img") 275 old_xml_path = os.path.join(scriptpath, "resource", "L2", flash_type, "Hi3516DV300-emmc.xml") 276 xml_path = os.path.join(local_image_path, "Hi3516DV300-emmc.xml") 277 copyFile(old_xml_path, xml_path) 278 # copyFile(ubootpath, local_image_path) 279 scriptfile = os.path.join(scriptpath, "resource", "L2", f'{flash_type.lower()}', "update.txt") 280 logger.info(f'scriptfile:{scriptfile}') 281 282 logger.printLog('进行烧写') 283 retry = 0 284 while retry < 3: 285 #usb刷机 286 cmd = ".\jre\\bin\java -jar hiburn.jar --burn -n %s -m USBBootrom -x %s" % (chip_type, xml_path) 287 logger.info("cmd is: %s" % cmd) 288 ret, outpri = subprocess.getstatusoutput(cmd) 289 logger.info("usb upgrade result: %s " % ret) 290 logger.info("print console: %s " % outpri) 291 if ret != 0: 292 if ret == 4 and retry < 2: 293 time.sleep(10) 294 retry = retry + 1 295 logger.info('flash fail,so flash once again') 296 continue 297 logger.info(ret) 298 logger.error("hiburn usb upgrade failed!!") 299 return False 300 retry = retry + 3 301 os.chdir(current_path) 302 logger.info("hiburn upgrade end, check board status") 303 logger.info("为保持hdc稳定性,等待60秒") 304 time.sleep(60) 305 try: 306 #pass 307 self.hdc_set_time(sn) 308 except Exception as e: 309 logger.printLog('hdc设置时间超时') 310 logger.printLog(str(e)) 311 try: 312 logger.info("打开serial") 313 ser = serial.Serial(deploy_com, int(baudrate), timeout=0) 314 if ser.is_open == False: 315 ser.open() 316 logger.info("get device status") 317 board_type = getBoardType(ser) 318 if board_type == "OHOS" and self.checkStatus(ser): 319 return True 320 321 # ret = sendCmd(ser, 'sa;reset;', READ_MAXTIMEOUT) 322 # board_type = getBoardType(ser) 323 # if board_type != "OHOS": 324 # logger.error("upgrade fail") 325 # return False 326 else: 327 return False 328 except Exception as e: 329 logger.info(str(e)) 330 return False 331 finally: 332 ser.close() 333 logger.info("close serial") 334 335 def checkStatus(self, ser): 336 time.sleep(180) 337 # print('wait 120 seconds') 338 ren_cmd = 'ps -elf | grep render_service' 339 laun_cmd = 'ps -elf | grep com.ohos.launcher' 340 time.sleep(10) 341 ren_status_1 = getPsPid(ser, ren_cmd, 'render_service') 342 time.sleep(5) 343 ren_status_2 = getPsPid(ser, ren_cmd, 'render_service') 344 time.sleep(5) 345 laun_status_1 = getPsPid(ser, laun_cmd, 'com.ohos.launcher') 346 time.sleep(5) 347 laun_status_2 = getPsPid(ser, laun_cmd, 'com.ohos.launcher') 348 # wes_cmd = 'pidof weston' 349 # laun_cmd = 'pidof com.ohos.launcher' 350 # time.sleep(10) 351 # wes_status_1 = sendCmd(ser, wes_cmd, READ_TIMEOUT) 352 # time.sleep(5) 353 # wes_status_2 = sendCmd(ser, wes_cmd, READ_TIMEOUT) 354 # time.sleep(5) 355 # laun_status_1 = sendCmd(ser, laun_cmd, READ_TIMEOUT) 356 # time.sleep(5) 357 # laun_status_2 = sendCmd(ser, laun_cmd, READ_TIMEOUT) 358 logger.info('ren_status_1: %s ren_status_2: %s laun_1: %s laun_2: %s '%(ren_status_1,ren_status_2,laun_status_1,laun_status_2)) 359 if ren_status_1 and ren_status_1 == ren_status_2 and laun_status_1 and laun_status_1 == laun_status_2: 360 return True 361 if not ren_status_1 or not ren_status_2 or ren_status_1 != ren_status_2: 362 # and not wes_status_1.strip().isdigit() or not wes_status_2.strip().isdigit() 363 logger.printLog('process render_service is not exist') 364 if not laun_status_1 or not laun_status_2 : 365 # or not laun_status_1.strip().isdigit() or not laun_status_2.strip().isdigit() 366 logger.printLog('process com.ohos.launcher is not exist') 367 return False 368 369 @timeout(30) 370 def hdc_set_time(self,sn): 371 # todo 372 y_m_d = time.strftime('%Y-%m-%d',time.localtime()) 373 h_m_s = time.strftime('%H:%M:%S',time.localtime()) 374 cmd = 'hdc_std -t %s shell date -u "%sT%s"'% (sn,y_m_d,h_m_s) 375 logger.printLog('hdc start set time') 376 logger.printLog('cmd is %s'% cmd) 377 ret,out = subprocess.getstatusoutput(cmd) 378 logger.printLog('hdc end set time') 379 if h_m_s not in out: 380 logger.printLog('hdc设置时间失败') 381 logger.printLog(out) 382 return False 383 logger.printLog(out) 384 385 def eraseDevice(self, cmd, usb_port): 386 ''' 387 ret 暂时没有作用,先使用out 388 ''' 389 erase_retry = 0 390 while erase_retry < 3: 391 # 通电 392 PowerOnByThread(usb_port) 393 logger.info("cmd is: %s" % cmd) 394 ret, outpri = subprocess.getstatusoutput(cmd) 395 logger.info("flash fastboot result: %s " % ret) 396 logger.info("print console: %s " % outpri) 397 is_earse_again = any([True if item in outpri else False for item in error_str_list]) 398 if ret == 0 and erase_retry < 2 and not is_earse_again : 399 logger.info('檫除成功'.center(20,'*')) 400 break 401 elif is_earse_again: 402 logger.info('檫除存在问题 重新上下电 重新檫除') 403 erase_retry += 1 404 time.sleep(5) 405 continue 406 else: 407 logger.info('other error') 408 return False 409 else: 410 return False 411 412 def catchEraseFail(self,cmd): 413 logger.printLog(datetime.datetime.now()) 414 ret, outpri = subprocess.getstatusoutput(cmd) 415 logger.printLog(datetime.datetime.now()) 416 return ret,outpri 417 418def getPsPid(ser, cmd, ps): 419 try: 420 data = sendCmd(ser, cmd, READ_TIMEOUT) 421 logger.info(data) 422 data = [item for item in data.split('\n') if 'grep' not in item and ps in item ] 423 if len(data) < 1: 424 return 425 elif len(data) > 1: 426 data = max(data, key=len, default='') 427 logger.info(data) 428 data = data[0].split() 429 if len(data) < 1: 430 return 431 else: 432 logger.info(data) 433 return data[1] 434 except Exception as e : 435 logger.info(str(e)) 436 logger.info('pid 获取失败') 437 438def PowerOnByThread(usb_port,wait_time=10): 439 thread = Thread(target=boardPowerOn, args=[usb_port, wait_time]) 440 thread.start() 441 logger.info("thread board power on start") 442 443 444 445def boardPowerOn(usb_port, waittime): 446 logger.info("board power on start") 447 time.sleep(waittime) 448 449 #对端口下电 450 if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "off"): 451 logger.error("board power off failed") 452 return False 453 454 #对端口上电 455 if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "on"): 456 logger.error("board power on failed") 457 return False 458 logger.info("board power on end") 459 460 461def getBoardType(ser): 462 ret = sendCmd(ser, '\r', READ_TIMEOUT) 463 if 'HMOS' in ret or 'OHOS' in ret or 'console:/ $' in ret or '#' in ret: 464 ostype = 'OHOS' 465 elif 'hisilicon' in ret: 466 ostype = 'uboot' 467 elif ' #' in ret: 468 ostype = 'linux' 469 else: 470 ostype = 'bootrom' 471 logger.info("board type is: %s" % ostype) 472 return ostype 473 474def sendCmd(ser, cmd, timeout): 475 logger.info("cmd is: %s " % cmd) 476 ser.write((cmd + '\n').encode()) 477 time.sleep(0.5) 478 ret = '' 479 i = 0 480 while True: 481 out = ser.read(ser.inWaiting()) 482 if not out: 483 break 484 if i > 12: 485 break 486 ret = ret + out.decode(encoding="utf-8", errors="ignore") 487 time.sleep(timeout) 488 i = i + 1 489 logger.info("result is: %s " % ret) 490 return ret 491 492def sendHdcCmd(cmd): 493 from subprocess import getstatusoutput 494 i = 0 495 while True: 496 if i > 3: 497 break 498 try: 499 out = executeHdcCmd(cmd) 500 logger.info('hdc cmd success') 501 return out 502 except Exception as e: 503 logger.info(str(e)) 504 logger.info('hdc cmd fail') 505 i += 1 506 time.sleep(1) 507 continue 508 logger.error('hdc cmd execute fail by three times') 509 return '' 510 511@timeout(10) 512def executeHdcCmd(cmd): 513 logger.info('cmd is %s' % cmd) 514 time.sleep(0.5) 515 ret, out = getstatusoutput(cmd) 516 time.sleep(0.5) 517 if ret != 0: 518 logger.info('execute fail') 519 return 'execute fail' 520 logger.info('result is %s'% out) 521 return out 522 523if __name__ == '__main__': 524 ser = serial.Serial('com5', 115200, timeout=0)