• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding:utf-8 -*-
2import uuid
3import sys
4import subprocess
5import os
6import time
7import re
8import shutil
9import random
10
11from core.base import BaseApp, dec_stepmsg
12from util.file_locker import FileLock
13from util.log_info import logger
14from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout
15from aw.Download.Download import *
16from aw.Common.Constant import CONSTANT
17from aw.Common.Common import getFileName
18from aw.ExtractFile.ExtractFile import *
19from aw.Common.Common import getHostIp, copyFile, copyDirectory
20total_time = ""
21lock_suffix = CONSTANT.File.LOCK_SUFFIX
22suc_file = CONSTANT.File.SUC_FILE
23failed_file = CONSTANT.File.FAILED_FILE
24REBOOT_TIMEOUT = 20000000
25
26
27
28class liteOsUpgrade_RK3568(BaseApp):
29    '''
30    @author: cwx1076044
31    '''
32
33    def __init__(self, param_file):
34        super().__init__(param_file)
35        self.param_List = ["upgrade_upgradeLocation", "sn"]
36
37    @dec_stepmsg("hongmeng RK3568 flash")
38    def excute(self):
39        '''
40        #===================================================================================
41        #   @Method:        excute(self)
42        #   @Precondition:  none
43        #   @Func:          升级执行入口
44        #   @PostStatus:    none
45        #   @eg:            excute()
46        #   @return:        True or Flase
47        #===================================================================================
48        '''
49        step_index = self.params_dict.get("step_list").index("liteOsUpgrade_RK3568_app")
50
51        # # 执行下载
52        # try:
53        #     if not self.download():
54        #         CONSTANT.ENVERRMESSAGE = "image download fail"
55        #         logger.printLog(CONSTANT.ENVERRMESSAGE)
56        #         return 98
57        # except Exception as e:
58        #     logger.error(e)
59        #     #raise e
60        #     return 98
61
62        # 执行升级
63        try:
64            return_code = self.upgrade()
65            if not return_code:
66                CONSTANT.ENVERRMESSAGE = "board upgrade fail"
67                logger.printLog(CONSTANT.ENVERRMESSAGE)
68                return False
69            if return_code == 98:
70                return 98
71            if return_code == 99:
72                return 99
73            return True
74        except Exception as e:
75            logger.error(e)
76            raise e
77
78
79    @dec_stepmsg("upgrade")
80    @timeout(3600)
81    def upgrade(self):
82        '''
83        #===================================================================================
84        #   @Method:        upgrade(self)
85        #   @Precondition:  none
86        #   @Func:          升级相关业务逻辑
87        #   @PostStatus:    none
88        #   @eg:            upgrade()
89        #   @return:        True or Flase
90        #===================================================================================
91        '''
92        global local_image_path, loader_tool_path, sn, LocationID ,test_num
93        version_savepath = self.params_dict.get("img_path")
94        upgrade_test_type = self.params_dict.get("UpgradeTestType")
95        sn = self.params_dict.get("sn")
96        LocationID = self.params_dict.get("LocationID")
97        test_num = self.params_dict.get("test_num")
98        pr_url = self.params_dict.get("pr_url")
99        logFilePath = self.logFilePath
100        logger.info(logFilePath)
101        r = logFilePath.rfind("\\")
102        report_path = logFilePath[:r]
103        logger.info(report_path)
104        scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))))
105        logger.info(scriptpath)
106        local_image_path = os.path.join(version_savepath)
107        logger.info(local_image_path)
108        loader_tool_path = os.path.join(scriptpath, "resource", "RK3568_tool", "upgrade_tool.exe")
109        logger.info(loader_tool_path)
110        mini_path = os.path.join(local_image_path, "mini_system_test", "L2_mini_system_test.py")
111        archive_path = os.path.join(version_savepath)
112        if not self.check_devices_mode():
113            check_devices_cmd = "hdc list targets"
114            f = send_times(check_devices_cmd)
115            logger.info(f)
116            if not f or "Empty" in f:
117                logger.error("No devices found,please check the device.")
118                return False
119            else:
120                logger.info("3568 board is connected.")
121                return self.check_devices_mode()
122        else:
123            # 下載鏡像
124            upgrde_loader_cmd = "%s -s %s UL %s/MiniLoaderAll.bin -noreset" % (loader_tool_path, LocationID, local_image_path)
125            h = sendCmd(upgrde_loader_cmd)
126            logger.info(h)
127            if "Upgrade loader ok" not in h:
128                logger.error("Download MiniLoaderAll.bin Fail!")
129                return False
130            else:
131                logger.printLog("Download MiniLoaderAll.bin Success!")
132                # time.sleep(3)
133                write_gpt_cmd = "%s -s %s DI -p %s/parameter.txt" % (loader_tool_path, LocationID, local_image_path)
134                j = sendCmd(write_gpt_cmd)
135                logger.info(j)
136                if "Write gpt ok" not in j:
137                    logger.error("Failed to execute the parameter.txt")
138                    return False
139                else:
140                    logger.printLog("Successfully executed parameter.txt.")
141                    # time.sleep(5)
142                    download_uboot_cmd = "%s -s %s DI -uboot %s/uboot.img %s/parameter.txt" % (
143                        loader_tool_path, LocationID, local_image_path, local_image_path)
144                    k = sendCmd(download_uboot_cmd)
145                    logger.info(k)
146                    if "Download image ok" not in k:
147                        logger.error("Failed to download the uboot.image!")
148                        if self.check_devices_mode():
149                            return 98
150                        return False
151                    else:
152                        logger.printLog("The uboot.image downloaded successfully!")
153                        # time.sleep(5)
154                        if not self.flash_version():
155                            return False
156                        reboot_devices_cmd = "%s -s %s RD" % (loader_tool_path, LocationID)
157                        reboot_result = sendCmd(reboot_devices_cmd)
158                        logger.info(reboot_result)
159                        time.sleep(30)
160                        try:
161                            if upgrade_test_type != "mini_system_test":
162                                if not start_cmd(sn):
163                                    if self.check_devices_mode():
164                                        return 98
165                                    return False
166                        except Exception as t:
167                            logger.info(t)
168                            if self.check_devices_mode():
169                                return 98
170                            return False
171                        time.sleep(10)
172                        if "Reset Device OK" not in reboot_result:
173                            logger.error("Failed to reboot the board!")
174                            return False
175                        else:
176                            logger.info("Reboot successfully!")
177                            logger.printLog("******下载完成,升级成功,开始进行冒烟测试******")
178                            os.system("hdc_std -t %s shell hilog -w start" % sn)
179                            os.system("hdc_std -t %s shell hilog -w start -t kmsg" % sn)
180                            if upgrade_test_type == "null":
181                                return True
182                            screenshot_path = os.path.join(local_image_path, "screenshot")
183
184                            resource_path = os.path.join(screenshot_path, 'xdevice_smoke')
185                            logger.info(resource_path)
186                            # py_path = os.path.join(screenshot_path, "main.py")
187                            py_path = "main.py"
188                            new_report_path = os.path.join(report_path, "result")
189                            logger.info(new_report_path)
190                            time_sleep = random.randint(1, 5)
191                            time.sleep(time_sleep)
192                            try:
193                                if not os.path.exists(new_report_path):
194                                    os.mkdir(new_report_path)
195                            except Exception as e:
196                                logger.error(e)
197                                return 98
198                            if upgrade_test_type == "mini_system_test":
199                                save_path = os.path.join(new_report_path)
200                                if exec_cmd(mini_path, sn, save_path, archive_path) == 98:
201                                    return 98
202                                return True
203
204                            if not upgrade_test_type or upgrade_test_type == "smoke_test":
205                                # 进到工程目录
206                                cur_path = os.getcwd()
207                                os.chdir(resource_path)
208                                test_return = cmd_test(resource_path, py_path, new_report_path, resource_path, sn, test_num, pr_url)
209                                # 执行完回到原来的目录
210                                os.chdir(cur_path)
211                                if test_return == 1:
212                                    return True
213                                if test_return == 98:
214                                    return 98
215                                if test_return == 99:
216                                    return 99
217                                else:
218                                    return False
219
220    @timeout(1000)
221    def flash_version(self):
222        partList = ["boot_linux", "system", "vendor", "userdata", "resource", "ramdisk", "chipset", "sys-prod", "chip-prod"]
223        for i in partList:
224            if not os.path.exists("%s/%s.img" % (local_image_path, i)):
225                logger.printLog("%s.img is not exist, ignore" % i)
226                continue
227            loadcmd = "%s -s %s DI -%s %s/%s.img" % (loader_tool_path, LocationID, i, local_image_path, i)
228            p = sendCmd(loadcmd)
229            logger.info(p)
230            # time.sleep(5)
231            if "Download image ok" not in p:
232                logger.info("try download %s again!" % i)
233                time.sleep(1)
234                second_cmd = "%s -s %s DI -%s %s/%s.img" % (loader_tool_path, LocationID, i, local_image_path, i)
235                f = sendCmd(second_cmd)
236                logger.info(f)
237                if "Download image ok" not in f:
238                    logger.printLog("Failed to download the %s.img!" % i)
239                    if self.check_devices_mode():
240                        return 98
241                    else:
242                        return False
243                return True
244            else:
245                logger.printLog("The %s.img downloaded successfully!" % i)
246        return True
247
248    @timeout(120)
249    def check_devices_mode(self):
250        check_times = 0
251        while check_times < 5:
252            check_mode_cmd = "%s LD" % loader_tool_path
253            g = sendCmd(check_mode_cmd)
254            logger.info(g)
255            #time.sleep(40)
256            if "LocationID=%s	Mode=Loader" % LocationID in g:
257                logger.info("3568 board has entered the Loader mode successfully!")
258                return True
259            else:
260                #if test_num != "2/2":
261                #    hdc_kill()
262                os.system("hdc -t %s shell reboot loader" % sn)
263                time.sleep(5)
264                check_times += 1
265        logger.error("Failed to enter the loader mode!")
266        return False
267
268    # @dec_stepmsg("download")
269    # @timeout(360)
270    # def download(self):
271    #     '''
272    #     #===================================================================================
273    #     #   @Method:        download(self)
274    #     #   @Precondition:  none
275    #     #   @Func:          构建下载到本地的路径,执行相应包的下载
276    #     #   @PostStatus:    none
277    #     #   @eg:            download()
278    #     #   @return:        True or Flase
279    #     #===================================================================================
280    #     '''
281    #     global version_savepath, version_name
282    #     dir_path = CONSTANT.Path.getDirPath()
283    #     if self.params_dict.get("pbiid"):
284    #         version_path = self.params_dict.get("pbiid")
285    #         version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT"))
286    #         version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name)
287    #     else:
288    #         version_path = self.params_dict.get("upgrade_upgradeLocation")
289    #         version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation"))))
290    #         version_savepath = os.path.join(dir_path, version_name, "img")
291    #         # 执行img下载
292    #
293    #     if self.params_dict.get("isDownload") == "True":
294    #         logger.printLog("不需要做下载,直接返回")
295    #         return True
296    #
297    #     import hashlib
298    #     save_file_str = version_path.replace("/", "").replace("\\", "")
299    #     save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest()
300    #     logger.info("download hash string:%s, hash value:%s" % (save_file_str, save_file_name))
301    #     save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt"))
302    #     logger.info(version_savepath)
303    #     logger.info(save_path_file)
304    #     t = version_savepath[:-4]
305    #     logger.info(t)
306    #     if not self.excutedown(version_path, version_savepath, save_path_file, False):
307    #         logger.info("download again!")
308    #         try:
309    #             if os.path.exists(t):
310    #                 shutil.rmtree(t)
311    #                 logger.info("remove dir succeed")
312    #             if os.path.exists(save_path_file):
313    #                 logger.info("remove file succeed")
314    #                 os.remove(save_path_file)
315    #         except Exception as p:
316    #             logger.error(p)
317    #             raise Exception(p)
318    #         time.sleep(15)
319    #         if not self.excutedown(version_path, version_savepath, save_path_file, False):
320    #             logger.error("download img fail")
321    #             return False
322    #
323    #     # 保存本地版本路径给devicetest去版本路径下取用例
324    #     saveVersion(save_path_file, version_savepath)
325    #     return True
326    #
327    # def excutedown(self, source_path, download_dir, suc_mark, is_file):
328    #     '''
329    #     #===================================================================================
330    #     #   @Method:        excutedown(source_path, download_dir, suc_mark, is_file)
331    #     #   @Precondition:  none
332    #     #   @Func:          执行下载动作
333    #     #   @PostStatus:    none
334    #     #   @Param:         source_path:资源文件路径
335    #     #                   download_dir:文件下载到本地的文件夹路径
336    #     #                   is_file:是否是文件
337    #     #   @eg:            excutedown("xxxx", "D:\\local\\image", suc_mark, Flase)
338    #     #   @return:        True or Flase
339    #     #===================================================================================
340    #     '''
341    #     failed_mark = os.path.join(download_dir, failed_file)
342    #     lock_path = os.path.join(download_dir, lock_suffix)
343    #     file_lock = FileLock()
344    #
345    #     if isDownLoadSuccess(download_dir, suc_mark, failed_mark):
346    #         return True
347    #     try:
348    #         nowtime = get_now_time_str_info()
349    #         logger.printLog("%s Downloading, please wait" % nowtime)
350    #         file_lock.lockFile(lock_path)
351    #         ret = ""
352    #         logger.info("Get lock. Start to ")
353    #         try:
354    #             if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True":
355    #                 ret = downloadByBitComet(source_path, download_dir, os_method)
356    #             elif source_path.startswith('\\\\'):
357    #                 ret = downloadByCopy(source_path, download_dir, is_file)
358    #             elif self.params_dict.get("pbiid"):
359    #                 ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT",
360    #                                              self.params_dict.get("pbiid"))
361    #             elif source_path.startswith("http"):
362    #                 ret = run_download(source_path, download_dir)
363    #         except Exception as f:
364    #             logger.error(f)
365    #
366    #         if source_path.endswith(".zip"):
367    #             zip_name = os.path.basename(source_path)
368    #             ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir)
369    #         if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)):
370    #             if source_path.startswith("http") and ("file_id=" in source_path):
371    #                 if source_path.endswith(".tar.gz"):
372    #                     zip_name = source_path.split('=')[-1]
373    #                 else:
374    #                     zip_name = "out.tar.gz"
375    #             else:
376    #                 zip_name = os.path.basename(source_path)
377    #             ret = unTarFile(os.path.join(download_dir, zip_name), download_dir)
378    #         nowtime = get_now_time_str_info()
379    #         logger.printLog("%s download to %s end" % (nowtime, download_dir))
380    #
381    #         if not ret:
382    #             with open(failed_mark, "a+") as fp:
383    #                 fp.write("")
384    #         return ret
385    #     except Exception as e:
386    #         logger.printLog(e)
387    #         #raise Exception(e)
388    #     finally:
389    #         file_lock.releaseFile()
390
391
392@timeout(30)
393def hdc_kill():
394    logger.info("kill the process")
395    os.system("hdc kill")
396    time.sleep(2)
397    logger.info("start the process")
398    os.system("hdc -l5 start")
399    # time.sleep(10)
400
401
402def sendCmd(mycmd):
403    result = "".join(os.popen(mycmd).readlines())
404    return result
405
406
407def send_times(mycmd):
408    times = 0
409    outcome = sendCmd(mycmd)
410    while times < 3:
411        if not outcome or "Empty" in outcome:
412            times += 1
413            time.sleep(3)
414        else:
415            time.sleep(3)
416            return outcome
417    return outcome
418
419
420@timeout(180)
421def start_cmd(sn):
422    try:
423        os.system("hdc -l5 start")
424        power_cmd = "hdc -t %s shell \"power-shell setmode 602\"" % sn
425        hilog_cmd = "hdc -t %s shell \"hilog -w start -l 400000000 -m none\"" % sn
426        logger.info(power_cmd)
427        logger.info(hilog_cmd)
428        power_result = sendCmd(power_cmd)
429        logger.info(power_result)
430        if not power_result:
431            return False
432        number = 0
433        while "Set Mode Success" not in power_result and number < 30:
434            time.sleep(4)
435            power_result = sendCmd(power_cmd)
436            logger.info(power_result)
437            number += 1
438            if number >= 20:
439                logger.error("Set mode failed")
440                return False
441        hilog_result = sendCmd(hilog_cmd)
442        logger.info(hilog_result)
443        return True
444    except Exception as e:
445        logger.error(e)
446        return False
447
448
449@timeout(900)
450def cmd_test(screenshot_path, py_path, new_report_path, resource_path, sn, test_num, pr_url):
451    global total_time
452    save_screenshot_path = os.path.join(new_report_path, "screenshot_result")
453    logger.info(save_screenshot_path)
454    time_sleep = random.randint(1, 5)
455    time.sleep(time_sleep)
456    try:
457        if not os.path.exists(save_screenshot_path):
458            os.mkdir(save_screenshot_path)
459        logger.info(save_screenshot_path)
460        base_screenshot_path = os.path.join(new_report_path, "screenshot_base")
461        if not os.path.exists(base_screenshot_path):
462            os.mkdir(base_screenshot_path)
463        logger.info(base_screenshot_path)
464    except Exception as e:
465        logger.error(e)
466        return 98
467    config_path = os.path.join(screenshot_path, "resource", "app_capture_screen_test_config.json")
468    py_cmd = "python %s --config %s --anwser_path %s --save_path %s --device_num %s --test_num %s --tools_path %s --pr_url %s" \
469             % (py_path, config_path, resource_path, save_screenshot_path, sn, test_num, screenshot_path, pr_url)
470    time1 = time.time()
471    result = outCmd(py_cmd, save_screenshot_path, base_screenshot_path, resource_path)
472    time2 = time.time()
473    total_time = int(time2 - time1)
474    logger.info("total_time: %s" % total_time)
475    if result == 1:
476        return True
477    if result == 98:
478        return 98
479    if result == 99:
480        return 99
481    else:
482        return False
483
484
485@timeout(900)
486def outCmd(cmd, save_screenshot_path, base_screenshot_path, resource_path):
487    logger.info("cmd is: %s" % cmd)
488    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="gbk")
489    curline = p.stdout.readline()
490    list_png_name = []
491    try:
492        while "End of check" not in curline:
493            curline = p.stdout.readline()
494            logger.info(curline)
495            if "abnarmal" in curline:
496                png_name = curline.split(" ")[3].split(".")[0]
497                list_png_name.append(png_name)
498            if "SmokeTest find some fatal problems" in curline:
499                logger.error("SmokeTest find some fatal problems!")
500                return 99
501    except Exception as e:
502        logger.error(e)
503        logger.error("execute smoke_test.py failed!")
504        return 99
505    l = list(set(list_png_name))
506    if l:
507        logger.error(l)
508    try:
509        for i in l:
510            result = os.path.join(resource_path, "%s.jpeg" % i)
511            base = os.path.join(base_screenshot_path, "%s.jpeg" % i)
512            shutil.copy(result, base)
513    except Exception as t:
514        logger.info(t)
515    p.wait()
516    logger.info("p.returncode %s" % p.returncode)
517    if p.returncode == 0:
518        logger.info("screenshot check is ok!")
519        return True
520    if p.returncode == 101:
521        logger.error("device disconnection, please check the device!")
522        return False
523    logger.error("screenshot test failed, check the %s" % save_screenshot_path)
524    return 98
525
526
527@timeout(1000)
528def exec_cmd(mini_path, sn, save_path, archive_path):
529    cmd = "python %s --device_num %s --save_path %s --archive_path %s" % (mini_path, sn, save_path, archive_path)
530    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="gbk")
531    curline = p.stdout.readline()
532    try:
533        while "End of check" not in curline:
534            curline = p.stdout.readline()
535            logger.info(curline)
536    except Exception as e:
537        logger.error(e)
538    p.wait()
539    logger.info("p.returncode %s" % p.returncode)
540    if p.returncode == 0:
541        logger.info("mini_system_test is ok!")
542        return True
543    logger.error("mini_system_test failed!")
544    return 98