1# -*- coding:utf-8 -*- 2import traceback 3import uuid 4import sys 5import subprocess 6import os 7import time 8import re 9import shutil 10import random 11import platform 12import socket 13 14from core.base import BaseApp, dec_stepmsg 15from util.file_locker import FileLock 16from util.log_info import logger 17from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout 18from aw.Download.Download import * 19from aw.Common.Constant import CONSTANT 20from aw.Common.Common import getFileName 21from aw.ExtractFile.ExtractFile import * 22from aw.Common.Common import getHostIp, copyFile, copyDirectory 23 24total_time = "" 25lock_suffix = CONSTANT.File.LOCK_SUFFIX 26suc_file = CONSTANT.File.SUC_FILE 27failed_file = CONSTANT.File.FAILED_FILE 28REBOOT_TIMEOUT = 20000000 29 30 31class liteOsUpgrade_RK3568(BaseApp): 32 ''' 33 @author: cwx1076044 34 ''' 35 36 def __init__(self, param_file): 37 super().__init__(param_file) 38 self.param_List = ["upgrade_upgradeLocation", "sn"] 39 40 @dec_stepmsg("hongmeng RK3568 flash") 41 def excute(self): 42 ''' 43 #=================================================================================== 44 # @Method: excute(self) 45 # @Precondition: none 46 # @Func: 升级执行入口 47 # @PostStatus: none 48 # @eg: excute() 49 # @return: True or Flase 50 #=================================================================================== 51 ''' 52 step_index = self.params_dict.get("step_list").index("liteOsUpgrade_RK3568_app") 53 54 # # 执行下载 55 # try: 56 # if not self.download(): 57 # CONSTANT.ENVERRMESSAGE = "image download fail" 58 # logger.printLog(CONSTANT.ENVERRMESSAGE) 59 # return 98 60 # except Exception as e: 61 # logger.error(e) 62 # #raise e 63 # return 98 64 65 # 执行升级 66 try: 67 return_code = self.upgrade() 68 if not return_code: 69 CONSTANT.ENVERRMESSAGE = "board upgrade fail" 70 logger.printLog(CONSTANT.ENVERRMESSAGE) 71 return False 72 if return_code == 98: 73 return 98 74 if return_code == 99: 75 return 99 76 return True 77 except Exception as e: 78 logger.error(e) 79 raise e 80 81 @dec_stepmsg("upgrade") 82 @timeout(3600) 83 def upgrade(self): 84 ''' 85 #=================================================================================== 86 # @Method: upgrade(self) 87 # @Precondition: none 88 # @Func: 升级相关业务逻辑 89 # @PostStatus: none 90 # @eg: upgrade() 91 # @return: True or Flase 92 #=================================================================================== 93 ''' 94 global local_image_path, loader_tool_path, sn, LocationID, test_num, system_type 95 system_type = platform.system() 96 hostname = socket.gethostname() 97 ipaddress = socket.gethostbyname(hostname) 98 logger.printLog("******系统ip为:%s ******" % ipaddress) 99 logger.printLog("******系统为:%s ******" % system_type) 100 if system_type == "Windows": 101 lock_file = r'C:/deviceupgrade/task.lock' 102 else: 103 lock_file = '/home/openharmony/deviceupgrade/task.lock' 104 # 如果上一个任务没执行完成,不往下继续执行 105 if not is_can_exec(lock_file): 106 return False 107 version_savepath = self.params_dict.get("img_path") 108 upgrade_test_type = self.params_dict.get("UpgradeTestType") 109 sn = self.params_dict.get("sn") 110 LocationID = self.params_dict.get("LocationID") 111 test_num = self.params_dict.get("test_num") 112 pr_url = self.params_dict.get("pr_url") 113 logFilePath = self.logFilePath 114 logger.info(logFilePath) 115 if system_type == "Windows": 116 r = logFilePath.rfind("\\") 117 else: 118 r = logFilePath.rfind("/") 119 report_path = logFilePath[:r] 120 logger.info(report_path) 121 scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))) 122 logger.info(scriptpath) 123 local_image_path = os.path.join(version_savepath) 124 logger.info(local_image_path) 125 if system_type == "Windows": 126 loader_tool_path = os.path.join(scriptpath, "resource", "RK3568_tool", "upgrade_tool.exe") 127 else: 128 loader_tool_path = os.path.join(scriptpath, "resource", "RK3568_tool", "upgrade_tool") 129 logger.info(loader_tool_path) 130 mini_path = os.path.join(local_image_path, "mini_system_test", "L2_mini_system_test.py") 131 archive_path = os.path.join(version_savepath) 132 if not self.check_devices_mode(): 133 check_devices_cmd = "hdc list targets" 134 f = send_times(check_devices_cmd) 135 logger.info(f) 136 delete_file_lock(lock_file) 137 if not f or "Empty" in f: 138 logger.error("No devices found,please check the device.") 139 return False 140 else: 141 logger.info("3568 board is connected.") 142 return self.check_devices_mode() 143 else: 144 # 下載鏡像 145 upgrde_loader_cmd = "%s -s %s UL %s/MiniLoaderAll.bin -noreset" % (loader_tool_path, LocationID, local_image_path) 146 h = sendCmd(upgrde_loader_cmd) 147 logger.info(h) 148 if "Upgrade loader ok" not in h: 149 logger.error("Download MiniLoaderAll.bin Fail!") 150 delete_file_lock(lock_file) 151 return False 152 else: 153 logger.printLog("Download MiniLoaderAll.bin Success!") 154 # time.sleep(3) 155 write_gpt_cmd = "%s -s %s DI -p %s/parameter.txt" % (loader_tool_path, LocationID, local_image_path) 156 j = sendCmd(write_gpt_cmd) 157 logger.info(j) 158 if "Write gpt ok" not in j: 159 logger.error("Failed to execute the parameter.txt") 160 delete_file_lock(lock_file) 161 return False 162 else: 163 logger.printLog("Successfully executed parameter.txt.") 164 # time.sleep(5) 165 download_uboot_cmd = "%s -s %s DI -uboot %s/uboot.img %s/parameter.txt" % ( 166 loader_tool_path, LocationID, local_image_path, local_image_path) 167 k = sendCmd(download_uboot_cmd) 168 logger.info(k) 169 if "Download image ok" not in k: 170 logger.error("Failed to download the uboot.image!") 171 delete_file_lock(lock_file) 172 if self.check_devices_mode(): 173 return 98 174 return False 175 else: 176 logger.printLog("The uboot.image downloaded successfully!") 177 # time.sleep(5) 178 if not self.flash_version(): 179 delete_file_lock(lock_file) 180 return False 181 reboot_devices_cmd = "%s -s %s RD" % (loader_tool_path, LocationID) 182 reboot_result = sendCmd(reboot_devices_cmd) 183 logger.info(reboot_result) 184 time.sleep(50) 185 # try: 186 # if upgrade_test_type != "mini_system_test": 187 # if not start_cmd(sn): 188 # if self.check_devices_mode(): 189 # return 98 190 # return False 191 # except Exception as t: 192 # logger.info(t) 193 # if self.check_devices_mode(): 194 # return 98 195 # return False 196 # time.sleep(10) 197 if "Reset Device OK" not in reboot_result: 198 logger.error("Failed to reboot the board!") 199 delete_file_lock(lock_file) 200 return False 201 else: 202 logger.info("Reboot successfully!") 203 #os.system("hdc -t %s shell set persist.usb.setting.gadget_conn_prompt false" % sn) 204 delete_file_lock(lock_file) 205 logger.printLog("******下载完成,升级成功,开始进行冒烟测试******") 206 hdc_kill() 207 # os.system("hdc_std -t %s shell hilog -w start" % sn) 208 # os.system("hdc_std -t %s shell hilog -w start -t kmsg" % sn) 209 if upgrade_test_type == "null": 210 return True 211 # 临时安装 212 # self.install_third_packages() 213 214 screenshot_path = os.path.join(local_image_path, "screenshot") 215 script_path = os.path.join(screenshot_path, 'new_script') 216 logger.info(script_path) 217 # py_path = os.path.join(script_path, "main.py") 218 py_file = "main.py" 219 new_report_path = os.path.join(report_path, "result") 220 logger.info(new_report_path) 221 time_sleep = random.randint(3, 7) 222 time.sleep(time_sleep) 223 try: 224 if not os.path.exists(new_report_path): 225 os.mkdir(new_report_path) 226 except Exception as e: 227 logger.error(e) 228 return 98 229 if upgrade_test_type == "mini_system_test": 230 save_path = os.path.join(new_report_path) 231 if exec_cmd(mini_path, sn, save_path, archive_path) == 98: 232 return 98 233 return True 234 235 if not upgrade_test_type or upgrade_test_type == "smoke_test": 236 # 进到工程目录 237 cur_path = os.getcwd() 238 os.chdir(script_path) 239 test_return = cmd_test(script_path, py_file, sn, test_num, new_report_path, pr_url) 240 # 执行完回到原来的目录 241 os.chdir(cur_path) 242 if test_return == 1: 243 return True 244 if test_return == 98: 245 return 98 246 if test_return == 99: 247 return 99 248 else: 249 return False 250 251 # def install_third_packages(self): 252 # try: 253 # logger.debug('python -m pip list') 254 # rst = subprocess.run('python -m pip list', capture_output=True, shell=True, encoding='utf-8', timeout=30) 255 # logger.debug(rst) 256 # logger.debug('python -m pip install pytest -U') 257 # rst = subprocess.run('python -m pip install pytest -U', capture_output=True, shell=True, encoding='utf-8', timeout=600) 258 # logger.debug(rst) 259 # logger.debug('python -m pip uninstall pytest-testreport -y') 260 # rst = subprocess.run('python -m pip uninstall pytest-testreport -y', capture_output=True, shell=True, encoding='utf-8', timeout=600) 261 # logger.debug(rst) 262 # logger.debug('python -m pip install pytest-html -U') 263 # rst = subprocess.run('python -m pip install pytest-html -U', capture_output=True, shell=True, encoding='utf-8', timeout=600) 264 # logger.debug(rst) 265 # logger.debug('python -m pip list') 266 # rst = subprocess.run('python -m pip list', capture_output=True, shell=True, encoding='utf-8', timeout=30) 267 # logger.debug(rst) 268 # except: 269 # logger.error(traceback.format_exc()) 270 271 @timeout(1000) 272 def flash_version(self): 273 partList = ["boot_linux", "system", "vendor", "userdata", "resource", "ramdisk", "chipset", "sys-prod", "chip-prod"] 274 for i in partList: 275 if not os.path.exists("%s/%s.img" % (local_image_path, i)): 276 logger.printLog("%s.img is not exist, ignore" % i) 277 continue 278 loadcmd = "%s -s %s DI -%s %s/%s.img" % (loader_tool_path, LocationID, i, local_image_path, i) 279 p = sendCmd(loadcmd) 280 logger.info(p) 281 # time.sleep(5) 282 if "Download image ok" not in p: 283 logger.info("try download %s again!" % i) 284 time.sleep(1) 285 second_cmd = "%s -s %s DI -%s %s/%s.img" % (loader_tool_path, LocationID, i, local_image_path, i) 286 f = sendCmd(second_cmd) 287 logger.info(f) 288 if "Download image ok" not in f: 289 logger.printLog("Failed to download the %s.img!" % i) 290 if self.check_devices_mode(): 291 return 98 292 else: 293 return False 294 return True 295 else: 296 logger.printLog("The %s.img downloaded successfully!" % i) 297 return True 298 299 @timeout(120) 300 def check_devices_mode(self): 301 check_times = 0 302 while check_times < 5: 303 check_mode_cmd = "%s LD" % loader_tool_path 304 g = sendCmd(check_mode_cmd) 305 logger.info(g) 306 # time.sleep(40) 307 if "LocationID=%s Mode=Loader" % LocationID in g: 308 logger.info("3568 board has entered the Loader mode successfully!") 309 return True 310 else: 311 # if test_num != "2/2": 312 # hdc_kill() 313 os.system("hdc -t %s shell reboot loader" % sn) 314 time.sleep(8) 315 check_times += 1 316 logger.error("Failed to enter the loader mode!") 317 return False 318 319 # @dec_stepmsg("download") 320 # @timeout(360) 321 # def download(self): 322 # ''' 323 # #=================================================================================== 324 # # @Method: download(self) 325 # # @Precondition: none 326 # # @Func: 构建下载到本地的路径,执行相应包的下载 327 # # @PostStatus: none 328 # # @eg: download() 329 # # @return: True or Flase 330 # #=================================================================================== 331 # ''' 332 # global version_savepath, version_name 333 # dir_path = CONSTANT.Path.getDirPath() 334 # if self.params_dict.get("pbiid"): 335 # version_path = self.params_dict.get("pbiid") 336 # version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT")) 337 # version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name) 338 # else: 339 # version_path = self.params_dict.get("upgrade_upgradeLocation") 340 # version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation")))) 341 # version_savepath = os.path.join(dir_path, version_name, "img") 342 # # 执行img下载 343 # 344 # if self.params_dict.get("isDownload") == "True": 345 # logger.printLog("不需要做下载,直接返回") 346 # return True 347 # 348 # import hashlib 349 # save_file_str = version_path.replace("/", "").replace("\\", "") 350 # save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest() 351 # logger.info("download hash string:%s, hash value:%s" % (save_file_str, save_file_name)) 352 # save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt")) 353 # logger.info(version_savepath) 354 # logger.info(save_path_file) 355 # t = version_savepath[:-4] 356 # logger.info(t) 357 # if not self.excutedown(version_path, version_savepath, save_path_file, False): 358 # logger.info("download again!") 359 # try: 360 # if os.path.exists(t): 361 # shutil.rmtree(t) 362 # logger.info("remove dir succeed") 363 # if os.path.exists(save_path_file): 364 # logger.info("remove file succeed") 365 # os.remove(save_path_file) 366 # except Exception as p: 367 # logger.error(p) 368 # raise Exception(p) 369 # time.sleep(15) 370 # if not self.excutedown(version_path, version_savepath, save_path_file, False): 371 # logger.error("download img fail") 372 # return False 373 # 374 # # 保存本地版本路径给devicetest去版本路径下取用例 375 # saveVersion(save_path_file, version_savepath) 376 # return True 377 # 378 # def excutedown(self, source_path, download_dir, suc_mark, is_file): 379 # ''' 380 # #=================================================================================== 381 # # @Method: excutedown(source_path, download_dir, suc_mark, is_file) 382 # # @Precondition: none 383 # # @Func: 执行下载动作 384 # # @PostStatus: none 385 # # @Param: source_path:资源文件路径 386 # # download_dir:文件下载到本地的文件夹路径 387 # # is_file:是否是文件 388 # # @eg: excutedown("xxxx", "D:\\local\\image", suc_mark, Flase) 389 # # @return: True or Flase 390 # #=================================================================================== 391 # ''' 392 # failed_mark = os.path.join(download_dir, failed_file) 393 # lock_path = os.path.join(download_dir, lock_suffix) 394 # file_lock = FileLock() 395 # 396 # if isDownLoadSuccess(download_dir, suc_mark, failed_mark): 397 # return True 398 # try: 399 # nowtime = get_now_time_str_info() 400 # logger.printLog("%s Downloading, please wait" % nowtime) 401 # file_lock.lockFile(lock_path) 402 # ret = "" 403 # logger.info("Get lock. Start to ") 404 # try: 405 # if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True": 406 # ret = downloadByBitComet(source_path, download_dir, os_method) 407 # elif source_path.startswith('\\\\'): 408 # ret = downloadByCopy(source_path, download_dir, is_file) 409 # elif self.params_dict.get("pbiid"): 410 # ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", 411 # self.params_dict.get("pbiid")) 412 # elif source_path.startswith("http"): 413 # ret = run_download(source_path, download_dir) 414 # except Exception as f: 415 # logger.error(f) 416 # 417 # if source_path.endswith(".zip"): 418 # zip_name = os.path.basename(source_path) 419 # ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir) 420 # if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)): 421 # if source_path.startswith("http") and ("file_id=" in source_path): 422 # if source_path.endswith(".tar.gz"): 423 # zip_name = source_path.split('=')[-1] 424 # else: 425 # zip_name = "out.tar.gz" 426 # else: 427 # zip_name = os.path.basename(source_path) 428 # ret = unTarFile(os.path.join(download_dir, zip_name), download_dir) 429 # nowtime = get_now_time_str_info() 430 # logger.printLog("%s download to %s end" % (nowtime, download_dir)) 431 # 432 # if not ret: 433 # with open(failed_mark, "a+") as fp: 434 # fp.write("") 435 # return ret 436 # except Exception as e: 437 # logger.printLog(e) 438 # #raise Exception(e) 439 # finally: 440 # file_lock.releaseFile() 441 442 443@timeout(30) 444def hdc_kill(): 445 logger.info("kill the process") 446 os.system("hdc kill") 447 time.sleep(2) 448 logger.info("start the process") 449 os.system("hdc -l5 start") 450 # time.sleep(10) 451 452 453def sendCmd(mycmd): 454 result = "".join(os.popen(mycmd).readlines()) 455 return result 456 457 458def send_times(mycmd): 459 times = 0 460 outcome = sendCmd(mycmd) 461 while times < 3: 462 if not outcome or "Empty" in outcome: 463 times += 1 464 time.sleep(3) 465 else: 466 time.sleep(3) 467 return outcome 468 return outcome 469 470 471@timeout(180) 472def start_cmd(sn): 473 try: 474 os.system("hdc -l5 start") 475 # power_cmd = "hdc -t %s shell \"power-shell setmode 602\"" % sn 476 hilog_cmd = "hdc -t %s shell \"hilog -w start -l 400000000 -m none\"" % sn 477 # logger.info(power_cmd) 478 logger.info(hilog_cmd) 479 # power_result = sendCmd(power_cmd) 480 # logger.info(power_result) 481 # if not power_result: 482 # return False 483 # number = 0 484 # while "Set Mode Success" not in power_result and number < 30: 485 # time.sleep(4) 486 # power_result = sendCmd(power_cmd) 487 # logger.info(power_result) 488 # number += 1 489 # if number >= 20: 490 # logger.error("Set mode failed") 491 # return False 492 hilog_result = sendCmd(hilog_cmd) 493 logger.info(hilog_result) 494 return True 495 except Exception as e: 496 logger.error(e) 497 return False 498 499 500@timeout(900) 501def cmd_test(screenshot_path, py_file, device_num, test_num, new_report_path, pr): 502 global total_time 503 save_screenshot_path = os.path.join(new_report_path, "screenshot_result") 504 logger.info(save_screenshot_path) 505 time_sleep = random.randint(1, 5) 506 time.sleep(time_sleep) 507 try: 508 if not os.path.exists(save_screenshot_path): 509 os.mkdir(save_screenshot_path) 510 logger.info(save_screenshot_path) 511 base_screenshot_path = os.path.join(new_report_path, "screenshot_base") 512 if not os.path.exists(base_screenshot_path): 513 os.mkdir(base_screenshot_path) 514 logger.info(base_screenshot_path) 515 except Exception as e: 516 logger.error(e) 517 return 98 518 # config_path = os.path.join(screenshot_path, "app_capture_screen_test_config.json") 519 py_cmd = "python {} --device_num {} --test_num {} --save_path {} --pr {}".format(py_file, device_num, test_num, save_screenshot_path, pr) 520 time1 = time.time() 521 result = outCmd(py_cmd, save_screenshot_path, base_screenshot_path, screenshot_path) 522 time2 = time.time() 523 total_time = int(time2 - time1) 524 logger.info("total_time: %s" % total_time) 525 if result == 1: 526 return True 527 if result == 98: 528 return 98 529 if result == 99: 530 return 99 531 else: 532 return False 533 534 535@timeout(900) 536def outCmd(cmd, save_screenshot_path, base_screenshot_path, script_path): 537 logger.info("cmd is: %s" % cmd) 538 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding='utf-8', errors='ignore', universal_newlines=True) 539 # p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, errors='ignore', universal_newlines=True) 540 curline = p.stdout.readline() 541 # list_png_name = [] 542 try: 543 while "End of check" not in curline: 544 curline = p.stdout.readline() 545 logger.info(curline) 546 if 'SmokeTest: End of check, test succeeded!' in curline: 547 return True 548 # if "abnarmal" in curline: 549 # png_name = curline.split(" ")[3].split(".")[0] 550 # list_png_name.append(png_name) 551 if "SmokeTest find some fatal problems" in curline: 552 logger.error("SmokeTest find some fatal problems!") 553 return 99 554 except Exception as e: 555 logger.error(e) 556 logger.error("execute smoke_test.py failed!") 557 return 99 558 # l = list(set(list_png_name)) 559 # if l: 560 # logger.error(l) 561 try: 562 resource_path = os.path.join(script_path, 'resource') 563 for jpeg_file in os.listdir(resource_path): 564 if jpeg_file.endswith('jpeg'): 565 result = os.path.join(resource_path, jpeg_file) 566 base = os.path.join(base_screenshot_path, jpeg_file) 567 shutil.copy(result, base) 568 except Exception as t: 569 logger.info(t) 570 p.wait() 571 logger.info("p.returncode %s" % p.returncode) 572 if p.returncode == 0: 573 logger.info("screenshot check is ok!") 574 return True 575 if p.returncode == 101: 576 logger.error("device disconnection, please check the device!") 577 return False 578 logger.error("screenshot test failed, check the %s" % save_screenshot_path) 579 return 98 580 581 582@timeout(1000) 583def exec_cmd(mini_path, sn, save_path, archive_path): 584 cmd = "python %s --device_num %s --save_path %s --archive_path %s" % (mini_path, sn, save_path, archive_path) 585 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="gbk") 586 curline = p.stdout.readline() 587 try: 588 while "End of check" not in curline: 589 curline = p.stdout.readline() 590 logger.info(curline) 591 except Exception as e: 592 logger.error(e) 593 p.wait() 594 logger.info("p.returncode %s" % p.returncode) 595 if p.returncode == 0: 596 logger.info("mini_system_test is ok!") 597 return True 598 logger.error("mini_system_test failed!") 599 return 98 600 601@timeout(1000) 602def is_can_exec(lock_file): 603 """ 604 判断升级是否可以执行 605 @param lock_file: 文件路径 606 """ 607 lock_duration = 10 * 60 # 10分钟(以秒为单位) 608 if os.getenv('wait_time') is not None: 609 lock_duration = int(os.getenv('wait_time')) 610 # 检查锁文件 611 if os.path.exists(lock_file): 612 # 获取锁文件的创建时间 613 lock_time = os.path.getmtime(lock_file) 614 current_time = time.time() 615 # 判断锁是否超时 616 if (current_time - lock_time) < lock_duration: 617 logger.error("ask is already running. Exiting.") 618 return False 619 else: 620 logger.warning("ask running time is more than %s second, can exec" % lock_duration) 621 delete_file_lock(lock_file) 622 create_file_lock(lock_file) 623 return True 624 else: 625 logger.info("no tasks are being executed") 626 create_file_lock(lock_file) 627 return True 628 629def create_file_lock(lock_file): 630 """ 631 创建文件锁 632 @param lock_file: 文件路径 633 """ 634 directory = os.path.dirname(lock_file) 635 if not os.path.exists(directory): 636 os.makedirs(directory) 637 logger.info("create_file_lock") 638 with open(lock_file, 'w') as f: 639 f.write('locked, please can not delete') 640def delete_file_lock(lock_file): 641 """ 642 删除文件 643 @param lock_file: 文件路径 644 """ 645 logger.info("delete_file_lock") 646 if os.path.exists(lock_file): 647 os.remove(lock_file)