1# -*- coding:utf-8 -*- 2import uuid 3import sys 4import subprocess 5import os 6import time 7import re 8import shutil 9import random 10 11 12from core.base import BaseApp, dec_stepmsg 13from util.file_locker import FileLock 14from util.log_info import logger 15from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout 16from aw.Download.Download import * 17from aw.Common.Constant import CONSTANT 18from aw.Common.Common import getFileName 19from aw.ExtractFile.ExtractFile import * 20from aw.Common.Common import getHostIp, copyFile, copyDirectory 21 22lock_suffix = CONSTANT.File.LOCK_SUFFIX 23suc_file = CONSTANT.File.SUC_FILE 24failed_file = CONSTANT.File.FAILED_FILE 25REBOOT_TIMEOUT = 20000000 26 27 28class liteOsUpgrade_RK3568(BaseApp): 29 ''' 30 @author: cwx1076044 31 ''' 32 33 def __init__(self, param_file): 34 super().__init__(param_file) 35 self.param_List = ["upgrade_upgradeLocation", "sn"] 36 37 @dec_stepmsg("hongmeng RK3568 flash") 38 def excute(self): 39 ''' 40 #=================================================================================== 41 # @Method: excute(self) 42 # @Precondition: none 43 # @Func: 升级执行入口 44 # @PostStatus: none 45 # @eg: excute() 46 # @return: True or Flase 47 #=================================================================================== 48 ''' 49 step_index = self.params_dict.get("step_list").index("liteOsUpgrade_RK3568_app") 50 51 # 执行下载 52 try: 53 if not self.download(): 54 CONSTANT.ENVERRMESSAGE = "image download fail" 55 logger.printLog(CONSTANT.ENVERRMESSAGE) 56 return False 57 except Exception as e: 58 logger.error(e) 59 raise e 60 61 # 执行升级 62 try: 63 return_code = self.upgrade() 64 if not return_code: 65 CONSTANT.ENVERRMESSAGE = "board upgrade fail" 66 logger.printLog(CONSTANT.ENVERRMESSAGE) 67 return False 68 if return_code == 98: 69 return 98 70 if return_code == 99: 71 return 99 72 return True 73 except Exception as e: 74 logger.error(e) 75 raise e 76 77 @dec_stepmsg("upgrade") 78 @timeout(3600) 79 def upgrade(self): 80 ''' 81 #=================================================================================== 82 # @Method: upgrade(self) 83 # @Precondition: none 84 # @Func: 升级相关业务逻辑 85 # @PostStatus: none 86 # @eg: upgrade() 87 # @return: True or Flase 88 #=================================================================================== 89 ''' 90 global local_image_path, loader_tool_path, sn, LocationID ,test_num 91 upgrade_test_type = self.params_dict.get("UpgradeTestType") 92 sn = self.params_dict.get("sn") 93 LocationID = self.params_dict.get("LocationID") 94 test_num = self.params_dict.get("test_num") 95 pr_url = self.params_dict.get("pr_url") 96 logFilePath = self.logFilePath 97 logger.info(logFilePath) 98 r = logFilePath.rfind("\\") 99 report_path = logFilePath[:r] 100 logger.info(report_path) 101 scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))) 102 logger.info(scriptpath) 103 local_image_path = os.path.join(version_savepath) 104 logger.info(local_image_path) 105 loader_tool_path = os.path.join(scriptpath, "resource", "RK3568_tool", "upgrade_tool.exe") 106 logger.info(loader_tool_path) 107 mini_path = os.path.join(local_image_path, "mini_system_test", "L2_mini_system_test.py") 108 archive_path = os.path.join(version_savepath) 109 if not self.check_devices_mode(): 110 check_devices_cmd = "hdc_std list targets" 111 f = send_times(check_devices_cmd) 112 logger.info(f) 113 if not f or "Empty" in f: 114 logger.error("No devices found,please check the device.") 115 return False 116 else: 117 logger.info("3568 board is connected.") 118 return self.check_devices_mode() 119 else: 120 # 下載鏡像 121 upgrde_loader_cmd = "%s -s %s UL %s/MiniLoaderAll.bin -noreset" % (loader_tool_path, LocationID, local_image_path) 122 h = sendCmd(upgrde_loader_cmd) 123 logger.info(h) 124 if "Upgrade loader ok" not in h: 125 logger.error("Download MiniLoaderAll.bin Fail!") 126 return False 127 else: 128 logger.printLog("Download MiniLoaderAll.bin Success!") 129 # time.sleep(3) 130 write_gpt_cmd = "%s -s %s DI -p %s/parameter.txt" % (loader_tool_path, LocationID, local_image_path) 131 j = sendCmd(write_gpt_cmd) 132 logger.info(j) 133 if "Write gpt ok" not in j: 134 logger.error("Failed to execute the parameter.txt") 135 return False 136 else: 137 logger.printLog("Successfully executed parameter.txt.") 138 # time.sleep(5) 139 download_uboot_cmd = "%s -s %s DI -uboot %s/uboot.img %s/parameter.txt" % ( 140 loader_tool_path, LocationID, local_image_path, local_image_path) 141 k = sendCmd(download_uboot_cmd) 142 logger.info(k) 143 if "Download image ok" not in k: 144 logger.error("Failed to download the uboot.image!") 145 if self.check_devices_mode(): 146 return 98 147 return False 148 else: 149 logger.printLog("The uboot.image downloaded successfully!") 150 # time.sleep(5) 151 if not self.flash_version(): 152 return False 153 reboot_devices_cmd = "%s -s %s RD" % (loader_tool_path, LocationID) 154 reboot_result = sendCmd(reboot_devices_cmd) 155 logger.info(reboot_result) 156 time.sleep(30) 157 try: 158 if upgrade_test_type != "mini_system_test": 159 if not start_cmd(sn): 160 if self.check_devices_mode(): 161 return 98 162 return False 163 except Exception as t: 164 logger.info(t) 165 if self.check_devices_mode(): 166 return 98 167 return False 168 time.sleep(10) 169 if "Reset Device OK" not in reboot_result: 170 logger.error("Failed to reboot the board!") 171 return False 172 else: 173 logger.info("Reboot successfully!") 174 logger.printLog("******下载完成,升级成功,开始进行冒烟测试******") 175 if upgrade_test_type == "null": 176 return True 177 screenshot_path = os.path.join(local_image_path, "screenshot") 178 resource_path = os.path.join(screenshot_path, "resource") 179 logger.info(resource_path) 180 py_path = os.path.join(resource_path, "capturescreentest.py") 181 new_report_path = os.path.join(report_path, "result") 182 logger.info(new_report_path) 183 time_sleep = random.randint(1, 5) 184 time.sleep(time_sleep) 185 try: 186 if not os.path.exists(new_report_path): 187 os.mkdir(new_report_path) 188 except Exception as e: 189 logger.error(e) 190 return 98 191 if upgrade_test_type == "mini_system_test": 192 save_path = os.path.join(new_report_path) 193 if exec_cmd(mini_path, sn, save_path, archive_path) == 98: 194 return 98 195 return True 196 if not upgrade_test_type or upgrade_test_type == "smoke_test": 197 test_return = cmd_test(screenshot_path, py_path, new_report_path, resource_path, sn, test_num, pr_url) 198 if test_return == 1: 199 return True 200 if test_return == 98: 201 return 98 202 if test_return == 99: 203 return 99 204 else: 205 return False 206 207 @timeout(1000) 208 def flash_version(self): 209 partList = ["boot_linux", "system", "vendor", "userdata", "resource", "ramdisk", "chipset", "sys-prod", "chip-prod"] 210 for i in partList: 211 if not os.path.exists("%s/%s.img" % (local_image_path, i)): 212 logger.printLog("%s.img is not exist, ignore" % i) 213 continue 214 loadcmd = "%s -s %s DI -%s %s/%s.img" % (loader_tool_path, LocationID, i, local_image_path, i) 215 p = sendCmd(loadcmd) 216 logger.info(p) 217 # time.sleep(5) 218 if "Download image ok" not in p: 219 logger.info("try download %s again!" % i) 220 time.sleep(1) 221 second_cmd = "%s -s %s DI -%s %s/%s.img" % (loader_tool_path, LocationID, i, local_image_path, i) 222 f = sendCmd(second_cmd) 223 logger.info(f) 224 if "Download image ok" not in f: 225 logger.printLog("Failed to download the %s.img!" % i) 226 if self.check_devices_mode(): 227 return 98 228 else: 229 return False 230 return True 231 else: 232 logger.printLog("The %s.img downloaded successfully!" % i) 233 return True 234 235 @timeout(120) 236 def check_devices_mode(self): 237 check_times = 0 238 while check_times < 5: 239 check_mode_cmd = "%s LD" % loader_tool_path 240 g = sendCmd(check_mode_cmd) 241 logger.info(g) 242 #time.sleep(40) 243 if "LocationID=%s Mode=Loader" % LocationID in g: 244 logger.info("3568 board has entered the Loader mode successfully!") 245 return True 246 else: 247 #if test_num != "2/2": 248 # hdc_kill() 249 os.system("hdc_std -t %s shell reboot loader" % sn) 250 time.sleep(5) 251 check_times += 1 252 logger.error("Failed to enter the loader mode!") 253 return False 254 255 @dec_stepmsg("download") 256 @timeout(360) 257 def download(self): 258 ''' 259 #=================================================================================== 260 # @Method: download(self) 261 # @Precondition: none 262 # @Func: 构建下载到本地的路径,执行相应包的下载 263 # @PostStatus: none 264 # @eg: download() 265 # @return: True or Flase 266 #=================================================================================== 267 ''' 268 global version_savepath, version_name 269 dir_path = CONSTANT.Path.getDirPath() 270 if self.params_dict.get("pbiid"): 271 version_path = self.params_dict.get("pbiid") 272 version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT")) 273 version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name) 274 else: 275 version_path = self.params_dict.get("upgrade_upgradeLocation") 276 version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation")))) 277 version_savepath = os.path.join(dir_path, version_name, "img") 278 # 执行img下载 279 280 if self.params_dict.get("isDownload") == "True": 281 logger.printLog("不需要做下载,直接返回") 282 return True 283 284 import hashlib 285 save_file_str = version_path.replace("/", "").replace("\\", "") 286 save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest() 287 logger.info("download hash string:%s, hash value:%s" % (save_file_str, save_file_name)) 288 save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt")) 289 if not self.excutedown(version_path, version_savepath, save_path_file, False): 290 logger.error("download img fail") 291 return 98 292 293 # 保存本地版本路径给devicetest去版本路径下取用例 294 saveVersion(save_path_file, version_savepath) 295 return True 296 297 def excutedown(self, source_path, download_dir, suc_mark, is_file): 298 ''' 299 #=================================================================================== 300 # @Method: excutedown(source_path, download_dir, suc_mark, is_file) 301 # @Precondition: none 302 # @Func: 执行下载动作 303 # @PostStatus: none 304 # @Param: source_path:资源文件路径 305 # download_dir:文件下载到本地的文件夹路径 306 # is_file:是否是文件 307 # @eg: excutedown("xxxx", "D:\\local\\image", suc_mark, Flase) 308 # @return: True or Flase 309 #=================================================================================== 310 ''' 311 failed_mark = os.path.join(download_dir, failed_file) 312 lock_path = os.path.join(download_dir, lock_suffix) 313 file_lock = FileLock() 314 315 if isDownLoadSuccess(download_dir, suc_mark, failed_mark): 316 return True 317 try: 318 nowtime = get_now_time_str_info() 319 logger.printLog("%s Downloading, please wait" % nowtime) 320 file_lock.lockFile(lock_path) 321 ret = "" 322 logger.info("Get lock. Start to ") 323 try: 324 if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True": 325 ret = downloadByBitComet(source_path, download_dir, os_method) 326 elif source_path.startswith('\\\\'): 327 ret = downloadByCopy(source_path, download_dir, is_file) 328 elif self.params_dict.get("pbiid"): 329 ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", 330 self.params_dict.get("pbiid")) 331 elif source_path.startswith("http"): 332 ret = run_download(source_path, download_dir) 333 except Exception as f: 334 logger.error(f) 335 logger.info("下载失败,间隔20秒,尝试再次下载。") 336 time.sleep(20) 337 ret = run_download(source_path, download_dir) 338 if source_path.endswith(".zip"): 339 zip_name = os.path.basename(source_path) 340 ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir) 341 if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)): 342 if source_path.startswith("http") and ("file_id=" in source_path): 343 if source_path.endswith(".tar.gz"): 344 zip_name = source_path.split('=')[-1] 345 else: 346 zip_name = "out.tar.gz" 347 else: 348 zip_name = os.path.basename(source_path) 349 ret = unTarFile(os.path.join(download_dir, zip_name), download_dir) 350 nowtime = get_now_time_str_info() 351 logger.printLog("%s download to %s end" % (nowtime, download_dir)) 352 353 if not ret: 354 with open(failed_mark, "a+") as fp: 355 fp.write("") 356 return ret 357 except Exception as e: 358 logger.printLog(e) 359 #raise Exception(e) 360 finally: 361 file_lock.releaseFile() 362 363 364@timeout(30) 365def hdc_kill(): 366 logger.info("kill the process") 367 os.system("hdc_std kill") 368 time.sleep(2) 369 logger.info("start the process") 370 os.system("hdc_std -l5 start") 371 # time.sleep(10) 372 373 374def sendCmd(mycmd): 375 result = "".join(os.popen(mycmd).readlines()) 376 return result 377 378 379def send_times(mycmd): 380 times = 0 381 outcome = sendCmd(mycmd) 382 while times < 3: 383 if not outcome or "Empty" in outcome: 384 times += 1 385 time.sleep(3) 386 else: 387 time.sleep(3) 388 return outcome 389 return outcome 390 391 392@timeout(180) 393def start_cmd(sn): 394 try: 395 os.system("hdc_std -l5 start") 396 power_cmd = "hdc_std -t %s shell \"power-shell setmode 602\"" % sn 397 hilog_cmd = "hdc_std -t %s shell \"hilog -w start -l 400000000 -m none\"" % sn 398 logger.info(power_cmd) 399 logger.info(hilog_cmd) 400 power_result = sendCmd(power_cmd) 401 logger.info(power_result) 402 if not power_result: 403 return False 404 number = 0 405 while "Set Mode Success" not in power_result and number < 30: 406 time.sleep(4) 407 power_result = sendCmd(power_cmd) 408 logger.info(power_result) 409 number += 1 410 if number >= 20: 411 logger.error("Set mode failed") 412 return False 413 hilog_result = sendCmd(hilog_cmd) 414 logger.info(hilog_result) 415 return True 416 except Exception as e: 417 logger.error(e) 418 return False 419 420 421@timeout(3600) 422def cmd_test(screenshot_path, py_path, new_report_path, resource_path, sn, test_num, pr_url): 423 save_screenshot_path = os.path.join(new_report_path, "screenshot_result") 424 logger.info(save_screenshot_path) 425 time_sleep = random.randint(1, 5) 426 time.sleep(time_sleep) 427 try: 428 if not os.path.exists(save_screenshot_path): 429 os.mkdir(save_screenshot_path) 430 logger.info(save_screenshot_path) 431 base_screenshot_path = os.path.join(new_report_path, "screenshot_base") 432 if not os.path.exists(base_screenshot_path): 433 os.mkdir(base_screenshot_path) 434 logger.info(base_screenshot_path) 435 except Exception as e: 436 logger.error(e) 437 return 98 438 config_path = os.path.join(screenshot_path, "resource", "app_capture_screen_test_config.json") 439 py_cmd = "python %s --config %s --anwser_path %s --save_path %s --device_num %s --test_num %s --tools_path %s --pr_url %s" \ 440 % (py_path, config_path, resource_path, save_screenshot_path, sn, test_num, screenshot_path, pr_url) 441 result = outCmd(py_cmd, save_screenshot_path, base_screenshot_path, resource_path) 442 if result == 1: 443 return True 444 if result == 98: 445 return 98 446 if result == 99: 447 return 99 448 else: 449 return False 450 451 452@timeout(3600) 453def outCmd(cmd, save_screenshot_path, base_screenshot_path, resource_path): 454 logger.info("cmd is: %s" % cmd) 455 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="gbk") 456 curline = p.stdout.readline() 457 list_png_name = [] 458 try: 459 while "End of check" not in curline: 460 curline = p.stdout.readline() 461 logger.info(curline) 462 if "abnarmal" in curline: 463 png_name = curline.split(" ")[3].split(".")[0] 464 list_png_name.append(png_name) 465 if "SmokeTest find some fatal problems" in curline: 466 logger.error("SmokeTest find some fatal problems!") 467 return 99 468 except Exception as e: 469 logger.error(e) 470 logger.error("execute smoke_test.py failed!") 471 return 99 472 l = list(set(list_png_name)) 473 if l: 474 logger.error(l) 475 try: 476 for i in l: 477 result = os.path.join(resource_path, "%s.png" % i) 478 base = os.path.join(base_screenshot_path, "%s.png" % i) 479 shutil.copy(result, base) 480 except Exception as t: 481 logger.info(t) 482 p.wait() 483 logger.info("p.returncode %s" % p.returncode) 484 if p.returncode == 0: 485 logger.info("screenshot check is ok!") 486 return True 487 if p.returncode == 101: 488 logger.error("device disconnection, please check the device!") 489 return False 490 logger.error("screenshot test failed, check the %s" % save_screenshot_path) 491 return 98 492 493 494@timeout(1000) 495def exec_cmd(mini_path, sn, save_path, archive_path): 496 cmd = "python %s --device_num %s --save_path %s --archive_path %s" % (mini_path, sn, save_path, archive_path) 497 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="gbk") 498 curline = p.stdout.readline() 499 try: 500 while "End of check" not in curline: 501 curline = p.stdout.readline() 502 logger.info(curline) 503 except Exception as e: 504 logger.error(e) 505 p.wait() 506 logger.info("p.returncode %s" % p.returncode) 507 if p.returncode == 0: 508 logger.info("mini_system_test is ok!") 509 return True 510 logger.error("mini_system_test failed!") 511 return 98