1#-*- coding:utf-8 -*- 2import uuid 3import sys 4import subprocess 5import os 6import serial 7 8from core.base import BaseApp, dec_stepmsg 9from util.file_locker import FileLock 10from util.log_info import logger 11from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout 12from aw.Telnet.TelnetClient import TelConnect 13from aw.Common.Constant import CONSTANT 14from aw.Download.Download import * 15from aw.Common.Common import getHostIp, copyFile, copyDirectory 16from aw.ExtractFile.ExtractFile import * 17from aw.poweronoff.serial_power_on_off import usbPowerOnOff 18from threading import Thread 19 20lock_suffix = CONSTANT.File.LOCK_SUFFIX #通过文件锁实现并发下载 21suc_file = CONSTANT.File.SUC_FILE #通过本文件是区分版本是否成功下载 22failed_file = CONSTANT.File.FAILED_FILE #通过本文件是标记文件下载失败 23READ_MAXTIMEOUT = 20 24READ_TIMEOUT = 30 25READ_MINITIMEOUT = 5 26uboot_finish = 'hisilicon #' 27cmd_finish = ' #' 28 29class liteOsUpgrade_L2(BaseApp): 30 31 def __init__(self, param_file): 32 super().__init__(param_file) 33 self.param_List = ["deploy_com", 34 "usb_port", 35 "upgrade_upgradeLocation"] 36 37 @dec_stepmsg("hongmeng L2 flash") 38 def excute(self): 39 ''' 40 #=================================================================================== 41 # @Method: excute(self) 42 # @Precondition: none 43 # @Func: 升级执行入口 44 # @PostStatus: none 45 # @eg: excute() 46 # @return: True or Flase 47 #=================================================================================== 48 ''' 49 step_index = self.params_dict.get("step_list").index("liteOsUpgrade_L2_app") 50 51 # 执行下载 52 try: 53 if not self.download(): 54 CONSTANT.ENVERRMESSAGE = "image download fail" 55 logger.printLog(CONSTANT.ENVERRMESSAGE) 56 return False 57 except Exception as e: 58 raise e 59 60 61 # 执行升级 62 try: 63 if not self.upgrade(): 64 CONSTANT.ENVERRMESSAGE = "board upgrade fail" 65 logger.printLog(CONSTANT.ENVERRMESSAGE) 66 return False 67 return True 68 except Exception as e: 69 raise e 70 71 @dec_stepmsg("download") 72 @timeout(18000) 73 def download(self): 74 ''' 75 #=================================================================================== 76 # @Method: download(self) 77 # @Precondition: none 78 # @Func: 构建下载到本地的路径,执行相应包的下载 79 # @PostStatus: none 80 # @eg: download() 81 # @return: True or Flase 82 #=================================================================================== 83 ''' 84 global version_savepath, version_name 85 dir_path = CONSTANT.Path.getDirPath() 86 if self.params_dict.get("pbiid"): 87 version_path = self.params_dict.get("pbiid") 88 version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT")) 89 version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name) 90 else: 91 version_path = self.params_dict.get("upgrade_upgradeLocation") 92 version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation")))) 93 version_savepath = os.path.join(dir_path, version_name) 94 95 if self.params_dict.get("isDownload") == "True": 96 logger.printLog("不需要做下载,直接返回") 97 return True 98 99 #执行img下载 100 import hashlib 101 save_file_str = version_path.replace("/", "").replace("\\", "") 102 save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest() 103 logger.info("download hash value:%s" % (save_file_name)) 104 save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt")) 105 logger.printLog('save_path_file: %s' % save_path_file) 106 if not self.excutedown(version_path, os.path.join(version_savepath, "img"), save_path_file, False): 107 logger.error("download img fail") 108 return False 109 110 #保存本地版本路径给devicetest去版本路径下取用例 111 saveVersion(save_path_file, os.path.join(version_savepath, "img")) 112 113 return True 114 115 def excutedown(self, source_path, download_dir, suc_mark, is_file): 116 ''' 117 #=================================================================================== 118 # @Method: excutedown(source_path, download_dir, is_file) 119 # @Precondition: none 120 # @Func: 执行下载动作 121 # @PostStatus: none 122 # @Param: source_path:资源文件路径 123 # download_dir:文件下载到本地的文件夹路径 124 # is_file:是否是文件 125 # 126 # @eg: excutedown("xxxx", "D:\\local\\image", Flase, os_method) 127 # @return: True or Flase 128 #=================================================================================== 129 ''' 130 failed_mark = os.path.join(download_dir, failed_file) 131 lock_path = os.path.join(download_dir, lock_suffix) 132 file_lock = FileLock() 133 134 if isDownLoadSuccess(download_dir, suc_mark, failed_mark): 135 return True 136 try: 137 nowtime = get_now_time_str_info() 138 logger.printLog("%s Downloading, please wait" % nowtime) 139 file_lock.lockFile(lock_path) 140 ret = "" 141 logger.info("Get lock. Start to ") 142 if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True": 143 ret = downloadByBitComet(source_path, download_dir, os_method) 144 elif source_path.startswith('\\\\'): 145 ret = downloadByCopy(source_path, download_dir, is_file) 146 elif self.params_dict.get("pbiid"): 147 ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", self.params_dict.get("pbiid")) 148 elif source_path.startswith("http"): 149 ret = downloadFileFromDevCloud(source_path, "", "", download_dir) 150 151 if source_path.endswith(".zip"): 152 zip_name = os.path.basename(source_path) 153 ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir) 154 if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)): 155 if source_path.startswith("http") and ("file_id=" in source_path): 156 if source_path.endswith(".tar.gz"): 157 zip_name = source_path.split('=')[-1] 158 else: 159 zip_name = "out.tar.gz" 160 else: 161 zip_name = os.path.basename(source_path) 162 logger.printLog(f'tar_file:{os.path.join(download_dir, zip_name)},dest_file:{download_dir}') 163 ret = unTarFile(os.path.join(download_dir, zip_name), download_dir) 164 nowtime = get_now_time_str_info() 165 logger.printLog("%s download to %s end" % (nowtime, download_dir)) 166 167 if not ret: 168 with open(failed_mark, "a+") as fp: 169 fp.write("") 170 return ret 171 except Exception as e: 172 logger.printLog(e) 173 raise Exception(e) 174 finally: 175 file_lock.releaseFile() 176 177 178 @dec_stepmsg("upgrade") 179 #@timeout(900) 180 def upgrade(self): 181 ''' 182 #=================================================================================== 183 # @Method: upgrade(self) 184 # @Precondition: none 185 # @Func: 升级相关业务逻辑 186 # @PostStatus: none 187 # @eg: upgrade() 188 # @return: True or Flase 189 #=================================================================================== 190 ''' 191 logger.printLog('开始升级') 192 deploy_com = self.params_dict.get("deploy_com") 193 usb_port = self.params_dict.get("usb_port") 194 baudrate = self.params_dict.get("baudrate") 195 #芯片类型,根据芯片类型获取对应的刷机命令 196 flash_type = self.params_dict.get("flash_type") 197 burn_usbport = self.params_dict.get("hiburn_usbport") 198 device_ip = self.params_dict.get("Device_IP") 199 device_netmask = self.params_dict.get("Device_Netmask") 200 device_gatewayip = self.params_dict.get("Device_GatewayIP") 201 chip_version = self.params_dict.get('chip_version') 202 chip_version = chip_version.lower() if chip_version else chip_version 203 204 if not deploy_com: 205 logger.error("deploy_com is NULL !!") 206 return False 207 if not burn_usbport: 208 logger.error("hiburn_usbport is NULL !!") 209 return False 210 if not baudrate: 211 baudrate = 115200 212 scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))) 213 #升级需要的工具归档 214 toolworkspace = CONSTANT.OSType.getworkspace() 215 hiburntoolpath = os.path.join(toolworkspace, "HiBurnCmdLine", "usb%s_tool" % burn_usbport) 216 logger.info("hiburn tool path is: %s" % hiburntoolpath) 217 if not os.path.exists(hiburntoolpath): 218 if not burn_usbport: 219 logger.error("hiburn_usbport is NULL !!") 220 return False 221 os.makedirs(hiburntoolpath) 222 toolpath = os.path.join(scriptpath, "resource", "HiBurnCmdLine.zip") 223 logger.info("copy %s to %s" % (toolpath, hiburntoolpath)) 224 copyFile(toolpath, hiburntoolpath) 225 zip_name = os.path.basename(toolpath) 226 ret = extractZipFile(os.path.join(hiburntoolpath, zip_name), hiburntoolpath) 227 if ret: 228 logger.info("unzip to %s succ" % (hiburntoolpath)) 229 #修改burn.config中的usb口 230 configpath = os.path.join(hiburntoolpath, "config", "burn.config") 231 all_data = "" 232 with open(configpath, "r", encoding="utf-8") as cf: 233 for line in cf: 234 if "usbDeviceNumber=" in line: 235 old_str = line 236 line = line.replace(old_str, "usbDeviceNumber=%s\r\n" % burn_usbport) 237 logger.info("replace line: %s " % line) 238 all_data += line 239 with open(configpath, "w", encoding="utf-8") as wf: 240 wf.write(all_data) 241 else: 242 logger.error("%s is not exit" % hiburntoolpath) 243 return False 244 #将升级需要的文件拷贝到镜像里面 245 local_image_path = os.path.join(version_savepath, "img") 246 old_xml_path = os.path.join(scriptpath, "resource", "L2", flash_type, "Hi3516DV300-emmc.xml") 247 xml_path = os.path.join(local_image_path, "Hi3516DV300-emmc.xml") 248 copyFile(old_xml_path, xml_path) 249 if flash_type.lower() == "ev300": 250 chip_type = "Hi3518EV300" 251 elif flash_type.lower() == "dv300": 252 chip_type = "Hi3516DV300" 253 else: 254 logger.error("flash_type is : %s " % flash_type) 255 return False 256 # copyFile(ubootpath, local_image_path) 257 scriptfile = os.path.join(scriptpath, "resource", "L2", f'{flash_type.lower()}', "update.txt") 258 logger.info(f'scriptfile:{scriptfile}') 259 260 current_path = os.getcwd() 261 logger.info("before excute hiburn,current path is: %s" % current_path) 262 os.chdir(hiburntoolpath) 263 logger.info("excute hiburn path is: %s" % os.getcwd()) 264 #擦除fastboot 265 logger.printLog('erase fastboot') 266 flash_uboot_xml = os.path.join(scriptpath, "resource", "L2", flash_type.lower(), "flash_fastboot.xml") 267 erase_retry = 0 268 while erase_retry < 3: 269 # 通电 270 PowerOnByThread(usb_port) 271 cmd = ".\jre\\bin\java -jar hiburn.jar --erase -n %s -m serial %s -x %s" % (chip_type, deploy_com.upper(), flash_uboot_xml) 272 logger.info("cmd is: %s" % cmd) 273 ret, outpri = subprocess.getstatusoutput(cmd) 274 logger.info("flash fastboot result: %s " % ret) 275 logger.info("print console: %s " % outpri) 276 if ret == 0 and 'Unknown' not in outpri and erase_retry < 2: 277 break 278 elif 'Unknown' in outpri: 279 logger.info('串口问题 重新上下电 重新檫除') 280 erase_retry += 1 281 time.sleep(5) 282 continue 283 else: 284 logger.info('other error') 285 return False 286 else: 287 return False 288 289 retry = 0 290 while retry < 3: 291 #usb刷机 292 cmd = ".\jre\\bin\java -jar hiburn.jar --burn -n %s -m USBBootrom -x %s" % (chip_type, xml_path) 293 logger.info("cmd is: %s" % cmd) 294 ret, outpri = subprocess.getstatusoutput(cmd) 295 logger.info("usb upgrade result: %s " % ret) 296 logger.info("print console: %s " % outpri) 297 if ret != 0: 298 if ret == 4 and retry < 2: 299 time.sleep(10) 300 retry = retry + 1 301 logger.info('flash fail,so flash once again') 302 continue 303 logger.info(ret) 304 logger.error("hiburn usb upgrade failed!!") 305 return False 306 retry = retry + 3 307 os.chdir(current_path) 308 logger.info("hiburn upgrade end, check board status") 309 return True 310 311def PowerOnByThread(usb_port,wait_time=10): 312 thread = Thread(target=boardPowerOn, args=[usb_port, wait_time]) 313 thread.start() 314 logger.info("thread board power on start") 315 316 317def boardPowerOn(usb_port, waittime): 318 logger.info("board power on start") 319 time.sleep(waittime) 320 321 #对端口下电 322 if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "off"): 323 logger.error("board power off failed") 324 return False 325 326 #对端口上电 327 if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "on"): 328 logger.error("board power on failed") 329 return False 330 logger.info("board power on end") 331 332 333def getBoardType(ser): 334 ret = sendCmd(ser, '\r', READ_TIMEOUT) 335 if 'HMOS' in ret or 'OHOS' in ret: 336 ostype = 'OHOS' 337 elif 'hisilicon' in ret: 338 ostype = 'uboot' 339 elif ' #' in ret: 340 ostype = 'linux' 341 else: 342 ostype = 'bootrom' 343 logger.info("board type is: %s" % ostype) 344 return ostype 345 346def sendCmd(ser, cmd, timeout): 347 logger.info("cmd is: %s " % cmd) 348 ser.write((cmd + '\n').encode()) 349 time.sleep(0.5) 350 ret = '' 351 i = 0 352 while True: 353 out = ser.read(ser.inWaiting()) 354 if not out: 355 break 356 if i > 2: 357 break 358 ret = ret + out.decode(encoding="utf-8", errors="ignore") 359 time.sleep(timeout) 360 i = i + 1 361 logger.info("result is: %s " % ret) 362 return ret 363 364 365