1#-*- coding:utf-8 -*- 2import uuid 3import sys 4import subprocess 5import os 6 7from core.base import BaseApp, dec_stepmsg 8from util.file_locker import FileLock 9from util.log_info import logger 10from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout 11from aw.Telnet.TelnetClient import TelConnect 12from aw.Common.Constant import CONSTANT 13from aw.Download.Download import * 14from aw.Common.Common import getHostIp 15from aw.ExtractFile.ExtractFile import * 16from aw.poweronoff.serial_power_on_off import serialPowerOnOff 17 18lock_suffix = CONSTANT.File.LOCK_SUFFIX #通过文件锁实现并发下载 19suc_file = CONSTANT.File.SUC_FILE #通过本文件是区分版本是否成功下载 20failed_file = CONSTANT.File.FAILED_FILE #通过本文件是标记文件下载失败 21READ_MAXTIMEOUT = 600 22READ_TIMEOUT = 30 23READ_MINITIMEOUT = 5 24uboot_finish = 'hisilicon #' 25cmd_finish = ' #' 26 27class liteOsUpgrade_L1(BaseApp): 28 ''' 29 @author: w00278233 30 ''' 31 32 def __init__(self, param_file): 33 super().__init__(param_file) 34 self.param_List = ["Telnet_IP", 35 "Telnet_Port", 36 "upgrade_upgradeLocation"] 37 38 @dec_stepmsg("hongmeng L1 flash") 39 def excute(self): 40 ''' 41 #=================================================================================== 42 # @Method: excute(self) 43 # @Precondition: none 44 # @Func: 升级执行入口 45 # @PostStatus: none 46 # @eg: excute() 47 # @return: True or Flase 48 #=================================================================================== 49 ''' 50 step_index = self.params_dict.get("step_list").index("liteOsUpgrade_L1_app") 51 52 # 执行下载 53 try: 54 if not self.download(): 55 CONSTANT.ENVERRMESSAGE = "image download fail" 56 logger.printLog(CONSTANT.ENVERRMESSAGE) 57 return False 58 except Exception as e: 59 raise e 60 61 # 执行升级 62 try: 63 if not self.upgrade(): 64 CONSTANT.ENVERRMESSAGE = "board upgrade fail" 65 logger.printLog(CONSTANT.ENVERRMESSAGE) 66 return False 67 return True 68 except Exception as e: 69 raise e 70 71 @dec_stepmsg("download") 72 @timeout(1800) 73 def download(self): 74 ''' 75 #=================================================================================== 76 # @Method: download(self) 77 # @Precondition: none 78 # @Func: 构建下载到本地的路径,执行相应包的下载 79 # @PostStatus: none 80 # @eg: download() 81 # @return: True or Flase 82 #=================================================================================== 83 ''' 84 global version_savepath, version_name 85 dir_path = CONSTANT.Path.getDirPath() 86 if self.params_dict.get("pbiid"): 87 version_path = self.params_dict.get("pbiid") 88 version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT")) 89 version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name) 90 else: 91 version_path = self.params_dict.get("upgrade_upgradeLocation") 92 version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation")))) 93 version_savepath = os.path.join(dir_path, version_name) 94 95 if self.params_dict.get("isDownload") == "True": 96 logger.printLog("不需要做下载,直接返回") 97 return True 98 99 #执行img下载 100 import hashlib 101 save_file_str = version_path.replace("/", "").replace("\\", "") 102 save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest() 103 logger.info("download hash string:%s, hash value:%s" % (save_file_str, save_file_name)) 104 save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt")) 105 if not self.excutedown(version_path, os.path.join(version_savepath, "img"), save_path_file, False): 106 logger.error("download img fail") 107 return False 108 109 #保存本地版本路径给devicetest去版本路径下取用例 110 saveVersion(save_path_file, os.path.join(version_savepath, "img")) 111 112 # 执行升级脚本下载 113 if self.params_dict.get("upgrade_script"): 114 suc_mark = os.path.join(version_savepath, "scriptfile", suc_file) 115 if not self.excutedown(self.params_dict.get("upgrade_script"), 116 os.path.join(version_savepath, "scriptfile"), suc_mark, True): 117 logger.error("download upgrade script fail") 118 return False 119 with open(suc_mark, "a+") as fp: 120 fp.write("") 121 122 return True 123 124 def excutedown(self, source_path, download_dir, suc_mark, is_file): 125 ''' 126 #=================================================================================== 127 # @Method: excutedown(source_path, download_dir, is_file) 128 # @Precondition: none 129 # @Func: 执行下载动作 130 # @PostStatus: none 131 # @Param: source_path:资源文件路径 132 # download_dir:文件下载到本地的文件夹路径 133 # is_file:是否是文件 134 # 135 # @eg: excutedown("xxxx", "D:\\local\\image", Flase, os_method) 136 # @return: True or Flase 137 #=================================================================================== 138 ''' 139 failed_mark = os.path.join(download_dir, failed_file) 140 lock_path = os.path.join(download_dir, lock_suffix) 141 file_lock = FileLock() 142 143 if isDownLoadSuccess(download_dir, suc_mark, failed_mark): 144 return True 145 try: 146 nowtime = get_now_time_str_info() 147 logger.printLog("%s Downloading, please wait" % nowtime) 148 file_lock.lockFile(lock_path) 149 ret = "" 150 logger.info("Get lock. Start to ") 151 if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True": 152 ret = downloadByBitComet(source_path, download_dir, os_method) 153 elif source_path.startswith('\\\\'): 154 ret = downloadByCopy(source_path, download_dir, is_file) 155 elif self.params_dict.get("pbiid"): 156 ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", self.params_dict.get("pbiid")) 157 elif source_path.startswith("http"): 158 ret = downloadFileFromDevCloud(source_path, "", "", download_dir) 159 160 if source_path.endswith(".zip"): 161 zip_name = os.path.basename(source_path) 162 ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir) 163 if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)): 164 if source_path.startswith("http") and ("file_id=" in source_path): 165 if source_path.endswith(".tar.gz"): 166 zip_name = source_path.split('=')[-1] 167 else: 168 zip_name = "out.tar.gz" 169 else: 170 zip_name = os.path.basename(source_path) 171 ret = unTarFile(os.path.join(download_dir, zip_name), download_dir) 172 nowtime = get_now_time_str_info() 173 logger.printLog("%s download to %s end" % (nowtime, download_dir)) 174 175 if not ret: 176 with open(failed_mark, "a+") as fp: 177 fp.write("") 178 return ret 179 except Exception as e: 180 logger.printLog(e) 181 raise Exception(e) 182 finally: 183 file_lock.releaseFile() 184 185 186 @dec_stepmsg("upgrade") 187 @timeout(900) 188 def upgrade(self): 189 ''' 190 #=================================================================================== 191 # @Method: upgrade(self) 192 # @Precondition: none 193 # @Func: 升级相关业务逻辑 194 # @PostStatus: none 195 # @eg: upgrade() 196 # @return: True or Flase 197 #=================================================================================== 198 ''' 199 200 tel_IP = self.params_dict.get("Telnet_IP") 201 tel_port = self.params_dict.get("Telnet_Port") 202 tftp_ip = self.params_dict.get("Tftpserver_IP") 203 device_ip = self.params_dict.get("Device_IP") 204 device_Mac = self.params_dict.get("Device_MAC") 205 device_netmask = self.params_dict.get("Device_Netmask") 206 device_gatewayip = self.params_dict.get("Device_GatewayIP") 207 #芯片类型,根据芯片类型获取对应的刷机命令 208 flash_type = self.params_dict.get("flash_type") 209 #串口服务器命令端口 210 tel_cmd_port = self.params_dict.get("cmd_port") 211 #单板的电源端口 212 power_port = self.params_dict.get("board_power_port") 213 tel_username = self.params_dict.get("server_cmd_username") 214 tel_passwd = self.params_dict.get("server_cmd_password") 215 216 if not tel_IP or not tel_port: 217 logger.error("Telnet_IP or Telnet_Port is NULL !!") 218 return False 219 if not tftp_ip: 220 tftp_ip = getHostIp() 221 logger.info("get host ip, tftp_ip is %s" % tftp_ip) 222 if not device_netmask: 223 device_netmask = "255.255.252.0" 224 if not device_Mac: 225 device_Mac = "3a:82:d0:08:f4:99" 226 if not tel_cmd_port: 227 tel_cmd_port = "18984" 228 local_image_path = "%s/%s" % (version_name, "img") 229 if self.params_dict.get("upgrade_script"): 230 script_name = self.params_dict.get("upgrade_script").split('\\')[-1] 231 scriptfile = os.path.join(version_savepath, "scriptfile", script_name) 232 else: 233 scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))) 234 if "ev" in flash_type: 235 scriptfile = os.path.join(scriptpath, "resource", "L1", "ev200", "update.txt") 236 elif "dv" in flash_type: 237 scriptfile = os.path.join(scriptpath, "resource", "L1", "dv300", "update.txt") 238 logger.info("upgrade scriptfile is: %s" % scriptfile) 239 if not os.path.exists(scriptfile): 240 logger.error("%s is not exit" % scriptfile) 241 return False 242 243 tnc = TelConnect(tel_IP, tel_port) 244 try: 245 board_type = tnc.getBoardType(cmd_finish, READ_MINITIMEOUT) 246 if board_type != "uboot": 247 if not tnc.sendResetCmd(uboot_finish, READ_TIMEOUT): 248 #上下电恢复单板 249 if power_port: 250 if tel_passwd: 251 if not recoveryBoard(tnc, tel_IP, tel_cmd_port, power_port, tel_username, tel_passwd): 252 logger.error("recovery board fail, please check board") 253 return False 254 else: 255 if not recoveryBoard(tnc, tel_IP, tel_cmd_port, power_port, tel_username): 256 logger.error("recovery board fail, please check board") 257 return False 258 else: 259 logger.error("go hisilicon # fail, please check board status") 260 return False 261 262 with open(scriptfile, "r") as fp: 263 lines = fp.readlines() 264 for line in lines: 265 line = line.strip() 266 if not line: 267 logger.info("cmd is: %s " % line) 268 continue 269 if "%Tftpserver_IP%" in line: 270 line = line.replace("%Tftpserver_IP%", tftp_ip) 271 if "%Device_IP%" in line: 272 line = line.replace("%Device_IP%", device_ip) 273 if "%Device_MAC%" in line: 274 line = line.replace("%Device_MAC%", device_Mac) 275 if "%Device_Netmask%" in line: 276 line = line.replace("%Device_Netmask%", device_netmask) 277 if "%Device_GatewayIP%" in line: 278 line = line.replace("%Device_GatewayIP%", device_gatewayip) 279 if "userfs.img" in line: 280 if not os.path.exists(os.path.join(version_savepath, "img", "userfs.img")): 281 line = line.replace("userfs.img", "userfs_jffs2.img") 282 if "rootfs.img" in line: 283 if not os.path.exists(os.path.join(version_savepath, "img", "rootfs.img")): 284 line = line.replace("rootfs.img", "rootfs_jffs2.img") 285 if "tftp" in line: 286 packagefile_path = "/%s" % flash_type 287 new_packagefile_path = "/%s" % local_image_path 288 line = line.replace(packagefile_path, new_packagefile_path) 289 if not tnc.sendUpgradeCmd(line, uboot_finish, "done", READ_MAXTIMEOUT): 290 return False 291 continue 292 if "setenv" in line: 293 if not tnc.sendUpgradeCmd(line, uboot_finish, "", READ_MINITIMEOUT): 294 return False 295 continue 296 if "EXIT" in line: 297 break 298 if "go " in line: 299 if not tnc.sendUpgradeCmd(line, cmd_finish, "", READ_TIMEOUT): 300 logger.error("go OHOS # fail, please check board status") 301 return False 302 continue 303 if not tnc.sendUpgradeCmd(line, cmd_finish, "", READ_TIMEOUT): 304 return False 305 board_type = tnc.getBoardType(cmd_finish, READ_MINITIMEOUT) 306 if board_type == "OHOS": 307 logger.info("upgrade success") 308 return True 309 else: 310 logger.info("upgrade fail") 311 return False 312 except Exception as e: 313 logger.printLog(e) 314 return False 315 finally: 316 tnc.close() 317 logger.info("close telnet") 318 319def recoveryBoard(tnc, telip, cmdport, powerport, username, passwd=''): 320 logger.info("start recovery board") 321 # 先执行下电操作 322 if not serialPowerOnOff(telip, cmdport, powerport, "off", username, passwd): 323 logger.error("board power off failed") 324 return False 325 #再执行上电操作 326 if not serialPowerOnOff(telip, cmdport, powerport, "on", username, passwd): 327 logger.error("board power on failed") 328 return False 329 #再发送回车 330 if not tnc.sendEnterCmd(" #", READ_MINITIMEOUT): 331 logger.error("send Enter failed") 332 return False 333 board_type = tnc.getBoardType(cmd_finish, READ_MINITIMEOUT) 334 if board_type == "bootrom": 335 logger.error("recovery board failed") 336 return False 337 logger.info("recovery board succ") 338 return True 339 340 341if __name__ == "__main__": 342 param_file = sys.argv[1] 343 if not param_file: 344 logger.printLog("Missing params file") 345 sys.exit(-1) 346 try: 347 uphandle = liteOsUpgrade_L1(param_file) 348 uphandle._excuteApp() 349 except Exception as e: 350 logger.printLog(e) 351 sys.exit(-1) 352