1#-*- coding:utf-8 -*- 2import uuid 3import sys 4import subprocess 5import os 6import serial 7 8from core.base import BaseApp, dec_stepmsg 9from util.file_locker import FileLock 10from util.log_info import logger 11from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout 12from aw.Telnet.TelnetClient import TelConnect 13from aw.Common.Constant import CONSTANT 14from aw.Download.Download import * 15from aw.Common.Common import getHostIp, copyFile, copyDirectory 16from aw.ExtractFile.ExtractFile import * 17from aw.poweronoff.serial_power_on_off import usbPowerOnOff 18 19lock_suffix = CONSTANT.File.LOCK_SUFFIX #通过文件锁实现并发下载 20suc_file = CONSTANT.File.SUC_FILE #通过本文件是区分版本是否成功下载 21failed_file = CONSTANT.File.FAILED_FILE #通过本文件是标记文件下载失败 22READ_MAXTIMEOUT = 60 23READ_TIMEOUT = 30 24READ_MINITIMEOUT = 5 25uboot_finish = 'hisilicon #' 26cmd_finish = ' #' 27 28class liteOsUpgrade_L1_shequ_test(BaseApp): 29 ''' 30 @author: w00278233 31 ''' 32 33 def __init__(self, param_file): 34 super().__init__(param_file) 35 self.param_List = ["deploy_com", 36 "usb_port", 37 "upgrade_upgradeLocation"] 38 39 @dec_stepmsg("hongmeng L1 flash") 40 def excute(self): 41 ''' 42 #=================================================================================== 43 # @Method: excute(self) 44 # @Precondition: none 45 # @Func: 升级执行入口 46 # @PostStatus: none 47 # @eg: excute() 48 # @return: True or Flase 49 #=================================================================================== 50 ''' 51 step_index = self.params_dict.get("step_list").index("liteOsUpgrade_L1_shequ_test_app") 52 53 # 执行下载 54 try: 55 if not self.download(): 56 CONSTANT.ENVERRMESSAGE = "image download fail" 57 logger.printLog(CONSTANT.ENVERRMESSAGE) 58 return False 59 except Exception as e: 60 raise e 61 62 63 # 执行升级 64 try: 65 if not self.upgrade(): 66 CONSTANT.ENVERRMESSAGE = "board upgrade fail" 67 logger.printLog(CONSTANT.ENVERRMESSAGE) 68 return False 69 return True 70 except Exception as e: 71 raise e 72 73 @dec_stepmsg("download") 74 @timeout(1800) 75 def download(self): 76 ''' 77 #=================================================================================== 78 # @Method: download(self) 79 # @Precondition: none 80 # @Func: 构建下载到本地的路径,执行相应包的下载 81 # @PostStatus: none 82 # @eg: download() 83 # @return: True or Flase 84 #=================================================================================== 85 ''' 86 global version_savepath, version_name 87 dir_path = CONSTANT.Path.getDirPath() 88 if self.params_dict.get("pbiid"): 89 version_path = self.params_dict.get("pbiid") 90 version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT")) 91 version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name) 92 else: 93 version_path = self.params_dict.get("upgrade_upgradeLocation") 94 version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation")))) 95 version_savepath = os.path.join(dir_path, version_name) 96 97 if self.params_dict.get("isDownload") == "True": 98 logger.printLog("不需要做下载,直接返回") 99 return True 100 101 #执行img下载 102 import hashlib 103 save_file_str = version_path.replace("/", "").replace("\\", "") 104 save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest() 105 logger.info("download hash value:%s" % (save_file_name)) 106 save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt")) 107 if not self.excutedown(version_path, os.path.join(version_savepath, "img"), save_path_file, False): 108 logger.error("download img fail") 109 return False 110 111 #保存本地版本路径给devicetest去版本路径下取用例 112 saveVersion(save_path_file, os.path.join(version_savepath, "img")) 113 114 return True 115 116 def excutedown(self, source_path, download_dir, suc_mark, is_file): 117 ''' 118 #=================================================================================== 119 # @Method: excutedown(source_path, download_dir, is_file) 120 # @Precondition: none 121 # @Func: 执行下载动作 122 # @PostStatus: none 123 # @Param: source_path:资源文件路径 124 # download_dir:文件下载到本地的文件夹路径 125 # is_file:是否是文件 126 # 127 # @eg: excutedown("xxxx", "D:\\local\\image", Flase, os_method) 128 # @return: True or Flase 129 #=================================================================================== 130 ''' 131 failed_mark = os.path.join(download_dir, failed_file) 132 lock_path = os.path.join(download_dir, lock_suffix) 133 file_lock = FileLock() 134 135 if isDownLoadSuccess(download_dir, suc_mark, failed_mark): 136 return True 137 try: 138 nowtime = get_now_time_str_info() 139 logger.printLog("%s Downloading, please wait" % nowtime) 140 file_lock.lockFile(lock_path) 141 ret = "" 142 logger.info("Get lock. Start to ") 143 if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True": 144 ret = downloadByBitComet(source_path, download_dir, os_method) 145 elif source_path.startswith('\\\\'): 146 ret = downloadByCopy(source_path, download_dir, is_file) 147 elif self.params_dict.get("pbiid"): 148 ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", self.params_dict.get("pbiid")) 149 elif source_path.startswith("http"): 150 ret = downloadFileFromDevCloud(source_path, "", "", download_dir) 151 152 if source_path.endswith(".zip"): 153 zip_name = os.path.basename(source_path) 154 ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir) 155 if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)): 156 if source_path.startswith("http") and ("file_id=" in source_path): 157 if source_path.endswith(".tar.gz"): 158 zip_name = source_path.split('=')[-1] 159 else: 160 zip_name = "out.tar.gz" 161 else: 162 zip_name = os.path.basename(source_path) 163 ret = unTarFile(os.path.join(download_dir, zip_name), download_dir) 164 nowtime = get_now_time_str_info() 165 logger.printLog("%s download to %s end" % (nowtime, download_dir)) 166 167 if not ret: 168 with open(failed_mark, "a+") as fp: 169 fp.write("") 170 return ret 171 except Exception as e: 172 logger.printLog(e) 173 raise Exception(e) 174 finally: 175 file_lock.releaseFile() 176 177 178 @dec_stepmsg("upgrade") 179 #@timeout(900) 180 def upgrade(self): 181 ''' 182 #=================================================================================== 183 # @Method: upgrade(self) 184 # @Precondition: none 185 # @Func: 升级相关业务逻辑 186 # @PostStatus: none 187 # @eg: upgrade() 188 # @return: True or Flase 189 #=================================================================================== 190 ''' 191 logger.printLog('开始升级') 192 deploy_com = self.params_dict.get("deploy_com") 193 usb_port = self.params_dict.get("usb_port") 194 baudrate = self.params_dict.get("baudrate") 195 #芯片类型,根据芯片类型获取对应的刷机命令 196 flash_type = self.params_dict.get("flash_type") 197 burn_usbport = self.params_dict.get("hiburn_usbport") 198 device_ip = self.params_dict.get("Device_IP") 199 device_netmask = self.params_dict.get("Device_Netmask") 200 device_gatewayip = self.params_dict.get("Device_GatewayIP") 201 chip_version = self.params_dict.get('chip_version') 202 chip_version = chip_version.lower() if chip_version else chip_version 203 204 if not deploy_com: 205 logger.error("deploy_com is NULL !!") 206 return False 207 if not burn_usbport: 208 logger.error("hiburn_usbport is NULL !!") 209 return False 210 if not baudrate: 211 baudrate = 115200 212 scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))) 213 #升级需要的工具归档 214 logger.printLog('归档工具') 215 toolworkspace = CONSTANT.OSType.getworkspace() 216 hiburntoolpath = os.path.join(toolworkspace, "HiBurnCmdLine", "usb%s_tool" % burn_usbport) 217 logger.info("hiburn tool path is: %s" % hiburntoolpath) 218 if not os.path.exists(hiburntoolpath): 219 if not burn_usbport: 220 logger.error("hiburn_usbport is NULL !!") 221 return False 222 os.makedirs(hiburntoolpath) 223 toolpath = os.path.join(scriptpath, "resource", "HiBurnCmdLine.zip") 224 logger.info("copy %s to %s" % (toolpath, hiburntoolpath)) 225 copyFile(toolpath, hiburntoolpath) 226 zip_name = os.path.basename(toolpath) 227 ret = extractZipFile(os.path.join(hiburntoolpath, zip_name), hiburntoolpath) 228 if ret: 229 logger.info("unzip to %s succ" % (hiburntoolpath)) 230 #修改burn.config中的usb口 231 configpath = os.path.join(hiburntoolpath, "config", "burn.config") 232 all_data = "" 233 with open(configpath, "r", encoding="utf-8") as cf: 234 for line in cf: 235 if "usbDeviceNumber=" in line: 236 old_str = line 237 line = line.replace(old_str, "usbDeviceNumber=%s\r\n" % burn_usbport) 238 logger.info("replace line: %s " % line) 239 all_data += line 240 with open(configpath, "w", encoding="utf-8") as wf: 241 wf.write(all_data) 242 else: 243 logger.error("%s is not exit" % hiburntoolpath) 244 return False 245 #将升级需要的文件拷贝到镜像里面 246 logger.printLog('拷贝镜像') 247 local_image_path = os.path.join(version_savepath, "img") 248 old_xml_path = os.path.join(scriptpath, "resource", "L1", flash_type, "usb-burn.xml") 249 xml_path = os.path.join(local_image_path, "usb-burn.xml") 250 if chip_version == 'hi3518': 251 old_xml_path = os.path.join(scriptpath, "resource", "L1", flash_type, "usb-burn-jffs2.xml") 252 elif chip_version == 'hi3516': 253 old_xml_path = os.path.join(scriptpath, "resource", "L1", flash_type, "usb-burn-vfat.xml") 254 copyFile(old_xml_path, xml_path) 255 if flash_type.lower() == "ev300": 256 chip_type = "Hi3518EV300" 257 ubootpath = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "u-boot-hi3518ev300.bin") 258 elif flash_type.lower() == "dv300": 259 chip_type = "Hi3516DV300" 260 ubootpath = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "u-boot-hi3516dv300.bin") 261 else: 262 logger.error("flash_type is : %s " % flash_type) 263 return False 264 copyFile(ubootpath, local_image_path) 265 scriptfile = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "update.txt") 266 267 from threading import Thread 268 thread = Thread(target=boardPowerOn, args=[usb_port, 10]) 269 thread.start() 270 271 current_path = os.getcwd() 272 logger.info("before excute hiburn,current path is: %s" % current_path) 273 os.chdir(hiburntoolpath) 274 logger.info("excute hiburn path is: %s" % os.getcwd()) 275 #擦除fastboot 276 logger.printLog('擦除fastboot') 277 flash_uboot_xml = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "flash_fastboot.xml") 278 cmd = ".\jre\\bin\java -jar hiburn.jar --erase -n %s -m serial %s -x %s" % (chip_type, deploy_com.upper(), flash_uboot_xml) 279 logger.info("cmd is: %s" % cmd) 280 ret, outpri = subprocess.getstatusoutput(cmd) 281 logger.info("flash fastboot result: %s " % ret) 282 logger.info("print console: %s " % outpri) 283 284 retry = 0 285 while retry < 2: 286 287 #usb刷机 288 logger.printLog('开始刷机') 289 cmd = ".\jre\\bin\java -jar hiburn.jar --burn -n %s -m USBBootrom -x %s" % (chip_type, xml_path) 290 logger.info("cmd is: %s" % cmd) 291 ret, outpri = subprocess.getstatusoutput(cmd) 292 logger.info("usb upgrade result: %s " % ret) 293 logger.info("print console: %s " % outpri) 294 if ret != 0: 295 if ret == 4: 296 time.sleep(10) 297 retry = retry + 1 298 logger.printLog('刷机未成功再刷一次') 299 continue 300 logger.printLog(ret) 301 logger.error("hiburn usb upgrade failed!!") 302 return False 303 retry = retry + 3 304 os.chdir(current_path) 305 logger.info("hiburn upgrade end, check board status") 306 time.sleep(5) 307 ser = serial.Serial(deploy_com, int(baudrate), timeout=1) 308 reset_count = 0 309 try: 310 while reset_count < 2: 311 logger.info("打开serial") 312 if ser.is_open == False: 313 ser.open() 314 315 logger.info("获取单板状态") 316 board_type = getBoardType(ser) 317 logger.printLog('before reset') 318 logger.printLog(f'board_type:{board_type}') 319 if board_type == "uboot": 320 with open(scriptfile, "r") as fp: 321 lines = fp.readlines() 322 for line in lines: 323 if not line: 324 logger.info("cmd is: %s " % line) 325 continue 326 if "reset" in line: 327 ret = sendCmd(ser, line, READ_MAXTIMEOUT) 328 continue 329 ret = sendCmd(ser, line, READ_MINITIMEOUT) 330 board_type = getBoardType(ser) 331 logger.printLog('after reset') 332 logger.printLog(f'board_type:{board_type}') 333 334 if True: 335 reset_count += 1 336 logger.printLog('再次重启') 337 continue 338 time.sleep(1000) 339 if flash_type.lower() == "dv300": 340 logger.info("upgrade success") 341 init_cmd = "ifconfig eth0 %s netmask %s gateway %s \r" % (device_ip, device_netmask, device_gatewayip) 342 sendCmd(ser, init_cmd, READ_MINITIMEOUT) 343 sendCmd(ser, 'ifconfig\r', READ_MINITIMEOUT) 344 return True 345 elif flash_type.lower() == "ev300": 346 logger.info("setup wifi") 347 cmd = 'ls\r' 348 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 349 if not "sdcard" in ret.lower(): 350 cmd = 'mkdir /sdcard\r' 351 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 352 if "error:" in ret.lower(): 353 logger.error("mkdir /sdcard fail") 354 return False 355 cmd = 'mount /dev/mmcblk0p0 /sdcard vfat\r' 356 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 357 cmd = 'ls /sdcard\r' 358 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 359 cmd = 'cd /sdcard/wpa\r' 360 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 361 cmd = 'exec wpa_supplicant -i wlan0 -c wpa_supplicant.conf \r' 362 ret = sendCmd(ser, cmd, READ_MAXTIMEOUT) 363 if "error:" in ret.lower(): 364 logger.error("setup wifi fail") 365 return False 366 cmd = 'ifconfig\r' 367 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 368 if "error:" in ret.lower(): 369 logger.error("ifconfig fail") 370 return False 371 logger.info("upgrade success") 372 return True 373 else: 374 logger.error("upgrade fail") 375 return False 376 except Exception as e: 377 logger.printLog(e) 378 return False 379 finally: 380 ser.close() 381 logger.info("close serial") 382 383 384def boardPowerOn(usb_port, waittime): 385 logger.info("board power on start") 386 time.sleep(waittime) 387 388 #对端口下电 389 if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "off"): 390 logger.error("board power off failed") 391 return False 392 393 #对端口上电 394 if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "on"): 395 logger.error("board power on failed") 396 return False 397 logger.info("board power on end") 398 399 400def getBoardType(ser): 401 ret = sendCmd(ser, '\r', READ_TIMEOUT) 402 if 'HMOS' in ret or 'OHOS' in ret: 403 ostype = 'OHOS' 404 elif 'hisilicon' in ret: 405 ostype = 'uboot' 406 elif ' #' in ret: 407 ostype = 'linux' 408 else: 409 ostype = 'bootrom' 410 logger.info("board type is: %s" % ostype) 411 return ostype 412 413def sendCmd(ser, cmd, timeout): 414 logger.info("cmd is: %s " % cmd) 415 ser.write((cmd + '\n').encode()) 416 time.sleep(0.5) 417 ret = '' 418 i = 0 419 while True: 420 out = ser.read(ser.inWaiting()) 421 if not out: 422 break 423 if i > 2: 424 break 425 ret = ret + out.decode(encoding="utf-8", errors="ignore") 426 time.sleep(timeout) 427 i = i + 1 428 logger.info("result is: %s " % ret) 429 return ret 430 431 432 433if __name__ == "__main__": 434 param_file = sys.argv[1] 435 if not param_file: 436 logger.printLog("Missing params file") 437 sys.exit(-1) 438 try: 439 uphandle = liteOsUpgrade_L1(param_file) 440 uphandle._excuteApp() 441 except Exception as e: 442 logger.printLog(e) 443 sys.exit(-1) 444