1#-*- coding:utf-8 -*- 2import uuid 3import sys 4import subprocess 5import os 6import serial 7import time 8import re 9 10from core.base import BaseApp, dec_stepmsg 11from util.file_locker import FileLock 12from util.log_info import logger 13from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout 14from aw.Telnet.TelnetClient import TelConnect 15from aw.Common.Constant import CONSTANT 16from aw.Download.Download import * 17from aw.Common.Common import getHostIp, copyFile, copyDirectory 18from aw.ExtractFile.ExtractFile import * 19from aw.poweronoff.serial_power_on_off import usbPowerOnOff 20from threading import Thread 21 22lock_suffix = CONSTANT.File.LOCK_SUFFIX #通过文件锁实现并发下载 23suc_file = CONSTANT.File.SUC_FILE #通过本文件是区分版本是否成功下载 24failed_file = CONSTANT.File.FAILED_FILE #通过本文件是标记文件下载失败 25READ_MAXTIMEOUT = 5 26READ_TIMEOUT = 5 27READ_MINITIMEOUT = 2 28uboot_finish = 'hisilicon #' 29cmd_finish = ' #' 30error_str_list = ['Unknown', '发送起始帧失败', '发送头帧失败'] 31ip_cmd = 'ping 192.168.18.1' 32 33class liteOsUpgrade_linux(BaseApp): 34 ''' 35 @author: cwx1076044 36 ''' 37 38 def __init__(self, param_file): 39 super().__init__(param_file) 40 self.param_List = ["deploy_com", 41 "usb_port", 42 "upgrade_upgradeLocation"] 43 44 @dec_stepmsg("linux L1 flash") 45 def excute(self): 46 ''' 47 #=================================================================================== 48 # @Method: excute(self) 49 # @Precondition: none 50 # @Func: 升级执行入口 51 # @PostStatus: none 52 # @eg: excute() 53 # @return: True or Flase 54 #=================================================================================== 55 ''' 56 step_index = self.params_dict.get("step_list").index("liteOsUpgrade_linux_app") 57 58 # 执行下载 59 try: 60 if not self.download(): 61 CONSTANT.ENVERRMESSAGE = "image download fail" 62 logger.printLog(CONSTANT.ENVERRMESSAGE) 63 return False 64 except Exception as e: 65 raise e 66 67 68 # 执行升级 69 try: 70 if not self.upgrade(): 71 CONSTANT.ENVERRMESSAGE = "board upgrade fail" 72 logger.printLog(CONSTANT.ENVERRMESSAGE) 73 return False 74 return True 75 except Exception as e: 76 raise e 77 78 @dec_stepmsg("download") 79 @timeout(1800) 80 def download(self): 81 ''' 82 #=================================================================================== 83 # @Method: download(self) 84 # @Precondition: none 85 # @Func: 构建下载到本地的路径,执行相应包的下载 86 # @PostStatus: none 87 # @eg: download() 88 # @return: True or Flase 89 #=================================================================================== 90 ''' 91 global version_savepath, version_name 92 dir_path = CONSTANT.Path.getDirPath() 93 if self.params_dict.get("pbiid"): 94 version_path = self.params_dict.get("pbiid") 95 version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT")) 96 version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name) 97 else: 98 version_path = self.params_dict.get("upgrade_upgradeLocation") 99 version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation")))) 100 version_savepath = os.path.join(dir_path, version_name) 101 102 if self.params_dict.get("isDownload") == "True": 103 logger.printLog("不需要做下载,直接返回") 104 return True 105 106 #执行img下载 107 import hashlib 108 save_file_str = version_path.replace("/", "").replace("\\", "") 109 save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest() 110 logger.info("download hash value:%s" % (save_file_name)) 111 save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt")) 112 if not self.excutedown(version_path, os.path.join(version_savepath, "img"), save_path_file, False): 113 logger.error("download img fail") 114 return False 115 116 #保存本地版本路径给devicetest去版本路径下取用例 117 saveVersion(save_path_file, os.path.join(version_savepath, "img")) 118 119 return True 120 121 def excutedown(self, source_path, download_dir, suc_mark, is_file): 122 ''' 123 #=================================================================================== 124 # @Method: excutedown(source_path, download_dir, is_file) 125 # @Precondition: none 126 # @Func: 执行下载动作 127 # @PostStatus: none 128 # @Param: source_path:资源文件路径 129 # download_dir:文件下载到本地的文件夹路径 130 # is_file:是否是文件 131 # 132 # @eg: excutedown("xxxx", "D:\\local\\image", Flase, os_method) 133 # @return: True or Flase 134 #=================================================================================== 135 ''' 136 failed_mark = os.path.join(download_dir, failed_file) 137 lock_path = os.path.join(download_dir, lock_suffix) 138 file_lock = FileLock() 139 140 if isDownLoadSuccess(download_dir, suc_mark, failed_mark): 141 return True 142 try: 143 nowtime = get_now_time_str_info() 144 logger.printLog("%s Downloading, please wait" % nowtime) 145 file_lock.lockFile(lock_path) 146 ret = "" 147 logger.info("Get lock. Start to ") 148 if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True": 149 ret = downloadByBitComet(source_path, download_dir, os_method) 150 elif source_path.startswith('\\\\'): 151 ret = downloadByCopy(source_path, download_dir, is_file) 152 elif self.params_dict.get("pbiid"): 153 ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", self.params_dict.get("pbiid")) 154 elif source_path.startswith("http"): 155 ret = downloadFileFromDevCloud(source_path, "", "", download_dir) 156 157 if source_path.endswith(".zip"): 158 zip_name = os.path.basename(source_path) 159 ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir) 160 if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)): 161 if source_path.startswith("http") and ("file_id=" in source_path): 162 if source_path.endswith(".tar.gz"): 163 zip_name = source_path.split('=')[-1] 164 else: 165 zip_name = "out.tar.gz" 166 else: 167 zip_name = os.path.basename(source_path) 168 ret = unTarFile(os.path.join(download_dir, zip_name), download_dir) 169 nowtime = get_now_time_str_info() 170 logger.printLog("%s download to %s end" % (nowtime, download_dir)) 171 172 if not ret: 173 with open(failed_mark, "a+") as fp: 174 fp.write("") 175 return ret 176 except Exception as e: 177 logger.printLog(e) 178 raise Exception(e) 179 finally: 180 file_lock.releaseFile() 181 182 183 @dec_stepmsg("upgrade") 184 #@timeout(900) 185 def upgrade(self): 186 ''' 187 #=================================================================================== 188 # @Method: upgrade(self) 189 # @Precondition: none 190 # @Func: 升级相关业务逻辑 191 # @PostStatus: none 192 # @eg: upgrade() 193 # @return: True or Flase 194 #=================================================================================== 195 ''' 196 logger.printLog('开始升级') 197 deploy_com = self.params_dict.get("deploy_com") 198 usb_port = self.params_dict.get("usb_port") 199 baudrate = self.params_dict.get("baudrate") 200 #芯片类型,根据芯片类型获取对应的刷机命令 201 flash_type = self.params_dict.get("flash_type") 202 burn_usbport = self.params_dict.get("hiburn_usbport") 203 device_ip = self.params_dict.get("Device_IP") 204 device_netmask = self.params_dict.get("Device_Netmask") 205 device_gatewayip = self.params_dict.get("Device_GatewayIP") 206 chip_version = self.params_dict.get('chip_version') 207 chip_version = chip_version.lower() if chip_version else chip_version 208 209 if not deploy_com: 210 logger.error("deploy_com is NULL !!") 211 return False 212 if not burn_usbport: 213 logger.error("hiburn_usbport is NULL !!") 214 return False 215 if not baudrate: 216 baudrate = 115200 217 scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))) 218 #升级需要的工具归档 219 toolworkspace = CONSTANT.OSType.getworkspace() 220 hiburntoolpath = os.path.join(toolworkspace, "HiBurnCmdLine", "usb%s_tool" % burn_usbport) 221 logger.info("hiburn tool path is: %s" % hiburntoolpath) 222 if not os.path.exists(hiburntoolpath): 223 if not burn_usbport: 224 logger.error("hiburn_usbport is NULL !!") 225 return False 226 os.makedirs(hiburntoolpath) 227 toolpath = os.path.join(scriptpath, "resource", "HiBurnCmdLine.zip") 228 logger.info("copy %s to %s" % (toolpath, hiburntoolpath)) 229 copyFile(toolpath, hiburntoolpath) 230 zip_name = os.path.basename(toolpath) 231 ret = extractZipFile(os.path.join(hiburntoolpath, zip_name), hiburntoolpath) 232 if ret: 233 logger.info("unzip to %s succ" % (hiburntoolpath)) 234 #修改burn.config中的usb口 235 configpath = os.path.join(hiburntoolpath, "config", "burn.config") 236 all_data = "" 237 with open(configpath, "r", encoding="utf-8") as cf: 238 for line in cf: 239 if "usbDeviceNumber=" in line: 240 old_str = line 241 line = line.replace(old_str, "usbDeviceNumber=%s\r\n" % burn_usbport) 242 logger.info("replace line: %s " % line) 243 all_data += line 244 with open(configpath, "w", encoding="utf-8") as wf: 245 wf.write(all_data) 246 else: 247 logger.error("%s is not exit" % hiburntoolpath) 248 return False 249 #将升级需要的文件拷贝到镜像里面 250 local_image_path = os.path.join(version_savepath, "img") 251 old_xml_path = os.path.join(scriptpath, "resource", "L1", flash_type, "usb-burn.xml") 252 xml_path = os.path.join(local_image_path, "usb-burn.xml") 253 copyFile(old_xml_path, xml_path) 254 if flash_type.lower() == "ev300": 255 chip_type = "Hi3518EV300" 256 ubootpath = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "u-boot-hi3518ev300.bin") 257 elif flash_type.lower() == "dv300_linux": 258 chip_type = "Hi3516DV300" 259 ubootpath = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "u-boot-hi3516dv300.bin") 260 else: 261 logger.error("flash_type is : %s " % flash_type) 262 return False 263 copyFile(ubootpath, local_image_path) 264 scriptfile = os.path.join(scriptpath, "resource", "L1", f'{flash_type.lower()}', "update.txt") 265 logger.info(f'scriptfile:{scriptfile}') 266 267 current_path = os.getcwd() 268 logger.info("before excute hiburn,current path is: %s" % current_path) 269 os.chdir(hiburntoolpath) 270 logger.info("excute hiburn path is: %s" % os.getcwd()) 271 #擦除fastboot 272 logger.printLog('erase fastboot') 273 flash_uboot_xml = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "flash_fastboot.xml") 274 cmd = ".\jre\\bin\java -jar hiburn.jar --erase -n %s -m serial %s -x %s" % (chip_type, deploy_com.upper(), flash_uboot_xml) 275 self.eraseDevice(cmd, usb_port) 276 277 retry = 0 278 while retry < 3: 279 #usb刷机 280 cmd = ".\jre\\bin\java -jar hiburn.jar --burn -n %s -m USBBootrom -x %s" % (chip_type, xml_path) 281 logger.info("cmd is: %s" % cmd) 282 ret, outpri = subprocess.getstatusoutput(cmd) 283 logger.info("usb upgrade result: %s " % ret) 284 logger.info("print console: %s " % outpri) 285 if ret != 0: 286 if ret == 4 and retry < 2: 287 time.sleep(10) 288 retry = retry + 1 289 logger.info('flash fail,so flash once again') 290 continue 291 logger.info(ret) 292 logger.error("hiburn usb upgrade failed!!") 293 return False 294 retry = retry + 3 295 os.chdir(current_path) 296 logger.info("hiburn upgrade end, check board status") 297 time.sleep(10) 298 try: 299 logger.info("打开serial") 300 ser = serial.Serial(deploy_com, int(baudrate), timeout=0) 301 if self.bootUpUboot(ser,scriptfile) and self.configureIp(ser, flash_type, device_ip, device_netmask, device_gatewayip) and self.checkStatus(ser): 302 return True 303 return False 304 except Exception as e: 305 logger.info(e) 306 return False 307 finally: 308 ser.close() 309 logger.info("close serial") 310 311 def bootUpUboot(self,ser,scriptfile): 312 ''' 313 启动uboot 314 ''' 315 reset_count = 0 316 while reset_count < 2: 317 if ser.is_open == False: 318 ser.open() 319 logger.info("get device status") 320 board_type = getBoardType(ser) 321 if board_type == "uboot": 322 with open(scriptfile, "r") as fp: 323 lines = fp.readlines() 324 for line in lines: 325 if not line: 326 logger.info("cmd is: %s " % line) 327 continue 328 if "reset" in line: 329 ret = sendCmd(ser, line, READ_MAXTIMEOUT) 330 continue 331 ret = sendCmd(ser, line, READ_MINITIMEOUT) 332 board_type = getBoardType(ser) 333 if board_type != "OHOS": 334 if reset_count < 2: 335 logger.info('after reset;the device status is error,reset device again,reset_count:%d' % reset_count) 336 reset_count += 1 337 time.sleep(20) 338 continue 339 logger.error("upgrade fail") 340 return False 341 return True 342 else: 343 logger.info('before reset;the device status is error,reset device again,reset_count:%d'%reset_count) 344 reset_count += 1 345 time.sleep(20) 346 continue 347 else: 348 return False 349 350 def configureIp(self,ser,flash_type, device_ip, device_netmask, device_gatewayip): 351 ''' 352 配置ip,确认配置情况 353 ''' 354 rerty_count = 0 355 # test_count = 0 356 while rerty_count <= 2: 357 if flash_type.lower() == "dv300_linux": 358 if not self.afterRebootDvConfigure(ser, device_ip, device_netmask, device_gatewayip): 359 return False 360 elif flash_type.lower() == "ev300": 361 if not self.afterRebootEvConfigure(ser): 362 return False 363 time.sleep(5) 364 365 ret = sendCmd(ser, ip_cmd, READ_MINITIMEOUT) 366 logger.info(ret) 367 # if test_count ==0 : 368 # logger.printLog('ip 配置失败') 369 # test_count = 1 370 # continue 371 if 'Reply from 192.168.18.1' in ret: 372 logger.info('ip 配置成功') 373 return True 374 elif 'Ping: sending ICMP echo request failed' in ret: 375 logger.printLog('ip 配置失败') 376 logger.info('重新配置ip') 377 rerty_count += 1 378 else: 379 return True 380 381 def afterRebootDvConfigure(self,ser, device_ip, device_netmask, device_gatewayip): 382 logger.info("dv300_linux configure ip") 383 init_cmd = "ifconfig eth0 %s netmask %s gateway %s \r" % (device_ip, device_netmask, device_gatewayip) 384 sendCmd(ser, init_cmd, READ_MINITIMEOUT) 385 sendCmd(ser, 'ifconfig\r', READ_MINITIMEOUT) 386 return True 387 388 def afterRebootEvConfigure(self,ser): 389 logger.info("ev300 configuring ,setup wifi") 390 cmd = 'ls\r' 391 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 392 if not "sdcard" in ret.lower(): 393 cmd = 'mkdir /sdcard\r' 394 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 395 if "error:" in ret.lower(): 396 logger.error("mkdir /sdcard fail") 397 return False 398 cmd = 'mount /dev/mmcblk0p0 /sdcard vfat\r' 399 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 400 cmd = 'ls /sdcard\r' 401 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 402 cmd = 'cd /sdcard/wpa\r' 403 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 404 cmd = 'exec wpa_supplicant -i wlan0 -c wpa_supplicant.conf \r' 405 ret = sendCmd(ser, cmd, READ_MAXTIMEOUT) 406 if "error:" in ret.lower(): 407 logger.error("setup wifi fail") 408 return False 409 cmd = 'ifconfig\r' 410 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 411 if "error:" in ret.lower(): 412 logger.error("ifconfig fail") 413 return False 414 return True 415 416 def eraseDevice(self, cmd, usb_port): 417 ''' 418 ret 暂时没有作用,先使用out 419 ''' 420 erase_retry = 0 421 while erase_retry < 3: 422 # 通电 423 PowerOnByThread(usb_port) 424 logger.info("cmd is: %s" % cmd) 425 ret, outpri = subprocess.getstatusoutput(cmd) 426 logger.info("flash fastboot result: %s " % ret) 427 logger.info("print console: %s " % outpri) 428 is_earse_again = any([True if item in outpri else False for item in error_str_list]) 429 if ret == 0 and erase_retry < 2 and not is_earse_again : 430 logger.info('檫除成功'.center(20,'*')) 431 break 432 elif is_earse_again: 433 logger.info('檫除存在问题 重新上下电 重新檫除') 434 erase_retry += 1 435 time.sleep(5) 436 continue 437 else: 438 logger.info('other error') 439 return False 440 else: 441 return False 442 443 def checkStatus(self, ser): 444 times = 0 445 while times < 3: 446 ret1 = sendCmd(ser, 'ps -elf', READ_TIMEOUT) 447 list_ret = re.findall('appspawn', ret1) 448 logger.info(list_ret) 449 number = list_ret.count('appspawn') 450 logger.info(number) 451 if number >= 2: 452 return True 453 else: 454 times += 1 455 time.sleep(3) 456 logger.info("No two appspawn processes, please check the device!") 457 return False 458 459def PowerOnByThread(usb_port,wait_time=10): 460 thread = Thread(target=boardPowerOn, args=[usb_port, wait_time]) 461 thread.start() 462 logger.info("thread board power on start") 463 464 465def boardPowerOn(usb_port, waittime): 466 logger.info("board power on start") 467 time.sleep(waittime) 468 469 #对端口下电 470 if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "off"): 471 logger.error("board power off failed") 472 return False 473 474 #对端口上电 475 if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "on"): 476 logger.error("board power on failed") 477 return False 478 logger.info("board power on end") 479 480 481def getBoardType(ser): 482 ret = sendCmd(ser, '\r', READ_TIMEOUT) 483 # if 'HMOS' in ret or 'OHOS' in ret or '#' in ret: 484 # ostype = 'OHOS' 485 # elif 'hisilicon' in ret: 486 # ostype = 'uboot' 487 if 'hisilicon' in ret: 488 ostype = 'uboot' 489 elif 'HMOS' in ret or 'OHOS' in ret or '#' in ret: 490 ostype = 'OHOS' 491 elif ' #' in ret: 492 ostype = 'linux' 493 else: 494 ostype = 'bootrom' 495 logger.info("board type is: %s" % ostype) 496 return ostype 497 498def sendCmd(ser, cmd, timeout): 499 logger.info("cmd is: %s " % cmd) 500 ser.write((cmd + '\n').encode()) 501 time.sleep(5) 502 ret = '' 503 i = 0 504 while True: 505 out = ser.read(ser.inWaiting()) 506 if not out: 507 break 508 if i > 12: 509 break 510 ret = ret + out.decode(encoding="utf-8", errors="ignore") 511 time.sleep(timeout) 512 i = i + 1 513 logger.info("result is: %s " % ret) 514 return ret 515 516 517 518if __name__ == "__main__": 519 param_file = sys.argv[1] 520 if not param_file: 521 logger.printLog("Missing params file") 522 sys.exit(-1) 523 try: 524 uphandle = liteOsUpgrade_linux(param_file) 525 uphandle._excuteApp() 526 except Exception as e: 527 logger.printLog(e) 528 sys.exit(-1) 529 530