1# -*- coding:utf-8 -*- 2import uuid 3import sys 4import subprocess 5import os 6import time 7import re 8import shutil 9import random 10 11from core.base import BaseApp, dec_stepmsg 12from util.file_locker import FileLock 13from util.log_info import logger 14from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout 15from aw.Download.Download import * 16from aw.Common.Constant import CONSTANT 17from aw.Common.Common import getFileName 18from aw.ExtractFile.ExtractFile import * 19from aw.Common.Common import getHostIp, copyFile, copyDirectory 20total_time = "" 21lock_suffix = CONSTANT.File.LOCK_SUFFIX 22suc_file = CONSTANT.File.SUC_FILE 23failed_file = CONSTANT.File.FAILED_FILE 24REBOOT_TIMEOUT = 20000000 25 26 27 28class liteOsUpgrade_RK3568(BaseApp): 29 ''' 30 @author: cwx1076044 31 ''' 32 33 def __init__(self, param_file): 34 super().__init__(param_file) 35 self.param_List = ["upgrade_upgradeLocation", "sn"] 36 37 @dec_stepmsg("hongmeng RK3568 flash") 38 def excute(self): 39 ''' 40 #=================================================================================== 41 # @Method: excute(self) 42 # @Precondition: none 43 # @Func: 升级执行入口 44 # @PostStatus: none 45 # @eg: excute() 46 # @return: True or Flase 47 #=================================================================================== 48 ''' 49 step_index = self.params_dict.get("step_list").index("liteOsUpgrade_RK3568_app") 50 51 # # 执行下载 52 # try: 53 # if not self.download(): 54 # CONSTANT.ENVERRMESSAGE = "image download fail" 55 # logger.printLog(CONSTANT.ENVERRMESSAGE) 56 # return 98 57 # except Exception as e: 58 # logger.error(e) 59 # #raise e 60 # return 98 61 62 # 执行升级 63 try: 64 return_code = self.upgrade() 65 if not return_code: 66 CONSTANT.ENVERRMESSAGE = "board upgrade fail" 67 logger.printLog(CONSTANT.ENVERRMESSAGE) 68 return False 69 if return_code == 98: 70 return 98 71 if return_code == 99: 72 return 99 73 return True 74 except Exception as e: 75 logger.error(e) 76 raise e 77 78 79 @dec_stepmsg("upgrade") 80 @timeout(3600) 81 def upgrade(self): 82 ''' 83 #=================================================================================== 84 # @Method: upgrade(self) 85 # @Precondition: none 86 # @Func: 升级相关业务逻辑 87 # @PostStatus: none 88 # @eg: upgrade() 89 # @return: True or Flase 90 #=================================================================================== 91 ''' 92 global local_image_path, loader_tool_path, sn, LocationID ,test_num 93 version_savepath = self.params_dict.get("img_path") 94 upgrade_test_type = self.params_dict.get("UpgradeTestType") 95 sn = self.params_dict.get("sn") 96 LocationID = self.params_dict.get("LocationID") 97 test_num = self.params_dict.get("test_num") 98 pr_url = self.params_dict.get("pr_url") 99 logFilePath = self.logFilePath 100 logger.info(logFilePath) 101 r = logFilePath.rfind("\\") 102 report_path = logFilePath[:r] 103 logger.info(report_path) 104 scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))) 105 logger.info(scriptpath) 106 local_image_path = os.path.join(version_savepath) 107 logger.info(local_image_path) 108 loader_tool_path = os.path.join(scriptpath, "resource", "RK3568_tool", "upgrade_tool.exe") 109 logger.info(loader_tool_path) 110 mini_path = os.path.join(local_image_path, "mini_system_test", "L2_mini_system_test.py") 111 archive_path = os.path.join(version_savepath) 112 if not self.check_devices_mode(): 113 check_devices_cmd = "hdc list targets" 114 f = send_times(check_devices_cmd) 115 logger.info(f) 116 if not f or "Empty" in f: 117 logger.error("No devices found,please check the device.") 118 return False 119 else: 120 logger.info("3568 board is connected.") 121 return self.check_devices_mode() 122 else: 123 # 下載鏡像 124 upgrde_loader_cmd = "%s -s %s UL %s/MiniLoaderAll.bin -noreset" % (loader_tool_path, LocationID, local_image_path) 125 h = sendCmd(upgrde_loader_cmd) 126 logger.info(h) 127 if "Upgrade loader ok" not in h: 128 logger.error("Download MiniLoaderAll.bin Fail!") 129 return False 130 else: 131 logger.printLog("Download MiniLoaderAll.bin Success!") 132 # time.sleep(3) 133 write_gpt_cmd = "%s -s %s DI -p %s/parameter.txt" % (loader_tool_path, LocationID, local_image_path) 134 j = sendCmd(write_gpt_cmd) 135 logger.info(j) 136 if "Write gpt ok" not in j: 137 logger.error("Failed to execute the parameter.txt") 138 return False 139 else: 140 logger.printLog("Successfully executed parameter.txt.") 141 # time.sleep(5) 142 download_uboot_cmd = "%s -s %s DI -uboot %s/uboot.img %s/parameter.txt" % ( 143 loader_tool_path, LocationID, local_image_path, local_image_path) 144 k = sendCmd(download_uboot_cmd) 145 logger.info(k) 146 if "Download image ok" not in k: 147 logger.error("Failed to download the uboot.image!") 148 if self.check_devices_mode(): 149 return 98 150 return False 151 else: 152 logger.printLog("The uboot.image downloaded successfully!") 153 # time.sleep(5) 154 if not self.flash_version(): 155 return False 156 reboot_devices_cmd = "%s -s %s RD" % (loader_tool_path, LocationID) 157 reboot_result = sendCmd(reboot_devices_cmd) 158 logger.info(reboot_result) 159 time.sleep(30) 160 try: 161 if upgrade_test_type != "mini_system_test": 162 if not start_cmd(sn): 163 if self.check_devices_mode(): 164 return 98 165 return False 166 except Exception as t: 167 logger.info(t) 168 if self.check_devices_mode(): 169 return 98 170 return False 171 time.sleep(10) 172 if "Reset Device OK" not in reboot_result: 173 logger.error("Failed to reboot the board!") 174 return False 175 else: 176 logger.info("Reboot successfully!") 177 logger.printLog("******下载完成,升级成功,开始进行冒烟测试******") 178 os.system("hdc_std -t %s shell hilog -w start" % sn) 179 os.system("hdc_std -t %s shell hilog -w start -t kmsg" % sn) 180 if upgrade_test_type == "null": 181 return True 182 screenshot_path = os.path.join(local_image_path, "screenshot") 183 184 resource_path = os.path.join(screenshot_path, 'xdevice_smoke') 185 logger.info(resource_path) 186 # py_path = os.path.join(screenshot_path, "main.py") 187 py_path = "main.py" 188 new_report_path = os.path.join(report_path, "result") 189 logger.info(new_report_path) 190 time_sleep = random.randint(1, 5) 191 time.sleep(time_sleep) 192 try: 193 if not os.path.exists(new_report_path): 194 os.mkdir(new_report_path) 195 except Exception as e: 196 logger.error(e) 197 return 98 198 if upgrade_test_type == "mini_system_test": 199 save_path = os.path.join(new_report_path) 200 if exec_cmd(mini_path, sn, save_path, archive_path) == 98: 201 return 98 202 return True 203 204 if not upgrade_test_type or upgrade_test_type == "smoke_test": 205 # 进到工程目录 206 cur_path = os.getcwd() 207 os.chdir(resource_path) 208 test_return = cmd_test(resource_path, py_path, new_report_path, resource_path, sn, test_num, pr_url) 209 # 执行完回到原来的目录 210 os.chdir(cur_path) 211 if test_return == 1: 212 return True 213 if test_return == 98: 214 return 98 215 if test_return == 99: 216 return 99 217 else: 218 return False 219 220 @timeout(1000) 221 def flash_version(self): 222 partList = ["boot_linux", "system", "vendor", "userdata", "resource", "ramdisk", "chipset", "sys-prod", "chip-prod"] 223 for i in partList: 224 if not os.path.exists("%s/%s.img" % (local_image_path, i)): 225 logger.printLog("%s.img is not exist, ignore" % i) 226 continue 227 loadcmd = "%s -s %s DI -%s %s/%s.img" % (loader_tool_path, LocationID, i, local_image_path, i) 228 p = sendCmd(loadcmd) 229 logger.info(p) 230 # time.sleep(5) 231 if "Download image ok" not in p: 232 logger.info("try download %s again!" % i) 233 time.sleep(1) 234 second_cmd = "%s -s %s DI -%s %s/%s.img" % (loader_tool_path, LocationID, i, local_image_path, i) 235 f = sendCmd(second_cmd) 236 logger.info(f) 237 if "Download image ok" not in f: 238 logger.printLog("Failed to download the %s.img!" % i) 239 if self.check_devices_mode(): 240 return 98 241 else: 242 return False 243 return True 244 else: 245 logger.printLog("The %s.img downloaded successfully!" % i) 246 return True 247 248 @timeout(120) 249 def check_devices_mode(self): 250 check_times = 0 251 while check_times < 5: 252 check_mode_cmd = "%s LD" % loader_tool_path 253 g = sendCmd(check_mode_cmd) 254 logger.info(g) 255 #time.sleep(40) 256 if "LocationID=%s Mode=Loader" % LocationID in g: 257 logger.info("3568 board has entered the Loader mode successfully!") 258 return True 259 else: 260 #if test_num != "2/2": 261 # hdc_kill() 262 os.system("hdc -t %s shell reboot loader" % sn) 263 time.sleep(5) 264 check_times += 1 265 logger.error("Failed to enter the loader mode!") 266 return False 267 268 # @dec_stepmsg("download") 269 # @timeout(360) 270 # def download(self): 271 # ''' 272 # #=================================================================================== 273 # # @Method: download(self) 274 # # @Precondition: none 275 # # @Func: 构建下载到本地的路径,执行相应包的下载 276 # # @PostStatus: none 277 # # @eg: download() 278 # # @return: True or Flase 279 # #=================================================================================== 280 # ''' 281 # global version_savepath, version_name 282 # dir_path = CONSTANT.Path.getDirPath() 283 # if self.params_dict.get("pbiid"): 284 # version_path = self.params_dict.get("pbiid") 285 # version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT")) 286 # version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name) 287 # else: 288 # version_path = self.params_dict.get("upgrade_upgradeLocation") 289 # version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation")))) 290 # version_savepath = os.path.join(dir_path, version_name, "img") 291 # # 执行img下载 292 # 293 # if self.params_dict.get("isDownload") == "True": 294 # logger.printLog("不需要做下载,直接返回") 295 # return True 296 # 297 # import hashlib 298 # save_file_str = version_path.replace("/", "").replace("\\", "") 299 # save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest() 300 # logger.info("download hash string:%s, hash value:%s" % (save_file_str, save_file_name)) 301 # save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt")) 302 # logger.info(version_savepath) 303 # logger.info(save_path_file) 304 # t = version_savepath[:-4] 305 # logger.info(t) 306 # if not self.excutedown(version_path, version_savepath, save_path_file, False): 307 # logger.info("download again!") 308 # try: 309 # if os.path.exists(t): 310 # shutil.rmtree(t) 311 # logger.info("remove dir succeed") 312 # if os.path.exists(save_path_file): 313 # logger.info("remove file succeed") 314 # os.remove(save_path_file) 315 # except Exception as p: 316 # logger.error(p) 317 # raise Exception(p) 318 # time.sleep(15) 319 # if not self.excutedown(version_path, version_savepath, save_path_file, False): 320 # logger.error("download img fail") 321 # return False 322 # 323 # # 保存本地版本路径给devicetest去版本路径下取用例 324 # saveVersion(save_path_file, version_savepath) 325 # return True 326 # 327 # def excutedown(self, source_path, download_dir, suc_mark, is_file): 328 # ''' 329 # #=================================================================================== 330 # # @Method: excutedown(source_path, download_dir, suc_mark, is_file) 331 # # @Precondition: none 332 # # @Func: 执行下载动作 333 # # @PostStatus: none 334 # # @Param: source_path:资源文件路径 335 # # download_dir:文件下载到本地的文件夹路径 336 # # is_file:是否是文件 337 # # @eg: excutedown("xxxx", "D:\\local\\image", suc_mark, Flase) 338 # # @return: True or Flase 339 # #=================================================================================== 340 # ''' 341 # failed_mark = os.path.join(download_dir, failed_file) 342 # lock_path = os.path.join(download_dir, lock_suffix) 343 # file_lock = FileLock() 344 # 345 # if isDownLoadSuccess(download_dir, suc_mark, failed_mark): 346 # return True 347 # try: 348 # nowtime = get_now_time_str_info() 349 # logger.printLog("%s Downloading, please wait" % nowtime) 350 # file_lock.lockFile(lock_path) 351 # ret = "" 352 # logger.info("Get lock. Start to ") 353 # try: 354 # if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True": 355 # ret = downloadByBitComet(source_path, download_dir, os_method) 356 # elif source_path.startswith('\\\\'): 357 # ret = downloadByCopy(source_path, download_dir, is_file) 358 # elif self.params_dict.get("pbiid"): 359 # ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", 360 # self.params_dict.get("pbiid")) 361 # elif source_path.startswith("http"): 362 # ret = run_download(source_path, download_dir) 363 # except Exception as f: 364 # logger.error(f) 365 # 366 # if source_path.endswith(".zip"): 367 # zip_name = os.path.basename(source_path) 368 # ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir) 369 # if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)): 370 # if source_path.startswith("http") and ("file_id=" in source_path): 371 # if source_path.endswith(".tar.gz"): 372 # zip_name = source_path.split('=')[-1] 373 # else: 374 # zip_name = "out.tar.gz" 375 # else: 376 # zip_name = os.path.basename(source_path) 377 # ret = unTarFile(os.path.join(download_dir, zip_name), download_dir) 378 # nowtime = get_now_time_str_info() 379 # logger.printLog("%s download to %s end" % (nowtime, download_dir)) 380 # 381 # if not ret: 382 # with open(failed_mark, "a+") as fp: 383 # fp.write("") 384 # return ret 385 # except Exception as e: 386 # logger.printLog(e) 387 # #raise Exception(e) 388 # finally: 389 # file_lock.releaseFile() 390 391 392@timeout(30) 393def hdc_kill(): 394 logger.info("kill the process") 395 os.system("hdc kill") 396 time.sleep(2) 397 logger.info("start the process") 398 os.system("hdc -l5 start") 399 # time.sleep(10) 400 401 402def sendCmd(mycmd): 403 result = "".join(os.popen(mycmd).readlines()) 404 return result 405 406 407def send_times(mycmd): 408 times = 0 409 outcome = sendCmd(mycmd) 410 while times < 3: 411 if not outcome or "Empty" in outcome: 412 times += 1 413 time.sleep(3) 414 else: 415 time.sleep(3) 416 return outcome 417 return outcome 418 419 420@timeout(180) 421def start_cmd(sn): 422 try: 423 os.system("hdc -l5 start") 424 power_cmd = "hdc -t %s shell \"power-shell setmode 602\"" % sn 425 hilog_cmd = "hdc -t %s shell \"hilog -w start -l 400000000 -m none\"" % sn 426 logger.info(power_cmd) 427 logger.info(hilog_cmd) 428 power_result = sendCmd(power_cmd) 429 logger.info(power_result) 430 if not power_result: 431 return False 432 number = 0 433 while "Set Mode Success" not in power_result and number < 30: 434 time.sleep(4) 435 power_result = sendCmd(power_cmd) 436 logger.info(power_result) 437 number += 1 438 if number >= 20: 439 logger.error("Set mode failed") 440 return False 441 hilog_result = sendCmd(hilog_cmd) 442 logger.info(hilog_result) 443 return True 444 except Exception as e: 445 logger.error(e) 446 return False 447 448 449@timeout(900) 450def cmd_test(screenshot_path, py_path, new_report_path, resource_path, sn, test_num, pr_url): 451 global total_time 452 save_screenshot_path = os.path.join(new_report_path, "screenshot_result") 453 logger.info(save_screenshot_path) 454 time_sleep = random.randint(1, 5) 455 time.sleep(time_sleep) 456 try: 457 if not os.path.exists(save_screenshot_path): 458 os.mkdir(save_screenshot_path) 459 logger.info(save_screenshot_path) 460 base_screenshot_path = os.path.join(new_report_path, "screenshot_base") 461 if not os.path.exists(base_screenshot_path): 462 os.mkdir(base_screenshot_path) 463 logger.info(base_screenshot_path) 464 except Exception as e: 465 logger.error(e) 466 return 98 467 config_path = os.path.join(screenshot_path, "resource", "app_capture_screen_test_config.json") 468 py_cmd = "python %s --config %s --anwser_path %s --save_path %s --device_num %s --test_num %s --tools_path %s --pr_url %s" \ 469 % (py_path, config_path, resource_path, save_screenshot_path, sn, test_num, screenshot_path, pr_url) 470 time1 = time.time() 471 result = outCmd(py_cmd, save_screenshot_path, base_screenshot_path, resource_path) 472 time2 = time.time() 473 total_time = int(time2 - time1) 474 logger.info("total_time: %s" % total_time) 475 if result == 1: 476 return True 477 if result == 98: 478 return 98 479 if result == 99: 480 return 99 481 else: 482 return False 483 484 485@timeout(900) 486def outCmd(cmd, save_screenshot_path, base_screenshot_path, resource_path): 487 logger.info("cmd is: %s" % cmd) 488 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="gbk") 489 curline = p.stdout.readline() 490 list_png_name = [] 491 try: 492 while "End of check" not in curline: 493 curline = p.stdout.readline() 494 logger.info(curline) 495 if "abnarmal" in curline: 496 png_name = curline.split(" ")[3].split(".")[0] 497 list_png_name.append(png_name) 498 if "SmokeTest find some fatal problems" in curline: 499 logger.error("SmokeTest find some fatal problems!") 500 return 99 501 except Exception as e: 502 logger.error(e) 503 logger.error("execute smoke_test.py failed!") 504 return 99 505 l = list(set(list_png_name)) 506 if l: 507 logger.error(l) 508 try: 509 for i in l: 510 result = os.path.join(resource_path, "%s.jpeg" % i) 511 base = os.path.join(base_screenshot_path, "%s.jpeg" % i) 512 shutil.copy(result, base) 513 except Exception as t: 514 logger.info(t) 515 p.wait() 516 logger.info("p.returncode %s" % p.returncode) 517 if p.returncode == 0: 518 logger.info("screenshot check is ok!") 519 return True 520 if p.returncode == 101: 521 logger.error("device disconnection, please check the device!") 522 return False 523 logger.error("screenshot test failed, check the %s" % save_screenshot_path) 524 return 98 525 526 527@timeout(1000) 528def exec_cmd(mini_path, sn, save_path, archive_path): 529 cmd = "python %s --device_num %s --save_path %s --archive_path %s" % (mini_path, sn, save_path, archive_path) 530 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="gbk") 531 curline = p.stdout.readline() 532 try: 533 while "End of check" not in curline: 534 curline = p.stdout.readline() 535 logger.info(curline) 536 except Exception as e: 537 logger.error(e) 538 p.wait() 539 logger.info("p.returncode %s" % p.returncode) 540 if p.returncode == 0: 541 logger.info("mini_system_test is ok!") 542 return True 543 logger.error("mini_system_test failed!") 544 return 98