• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding:utf-8 -*-
2import uuid
3import sys
4import subprocess
5import os
6import time
7import re
8import shutil
9import random
10
11
12from core.base import BaseApp, dec_stepmsg
13from util.file_locker import FileLock
14from util.log_info import logger
15from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout
16from aw.Download.Download import *
17from aw.Common.Constant import CONSTANT
18from aw.Common.Common import getFileName
19from aw.ExtractFile.ExtractFile import *
20from aw.Common.Common import getHostIp, copyFile, copyDirectory
21
22lock_suffix = CONSTANT.File.LOCK_SUFFIX
23suc_file = CONSTANT.File.SUC_FILE
24failed_file = CONSTANT.File.FAILED_FILE
25REBOOT_TIMEOUT = 20000000
26
27
28class liteOsUpgrade_RK3568(BaseApp):
29    '''
30    @author: cwx1076044
31    '''
32
33    def __init__(self, param_file):
34        super().__init__(param_file)
35        self.param_List = ["upgrade_upgradeLocation", "sn"]
36
37    @dec_stepmsg("hongmeng RK3568 flash")
38    def excute(self):
39        '''
40        #===================================================================================
41        #   @Method:        excute(self)
42        #   @Precondition:  none
43        #   @Func:          升级执行入口
44        #   @PostStatus:    none
45        #   @eg:            excute()
46        #   @return:        True or Flase
47        #===================================================================================
48        '''
49        step_index = self.params_dict.get("step_list").index("liteOsUpgrade_RK3568_app")
50
51        # 执行下载
52        try:
53            if not self.download():
54                CONSTANT.ENVERRMESSAGE = "image download fail"
55                logger.printLog(CONSTANT.ENVERRMESSAGE)
56                return False
57        except Exception as e:
58            logger.error(e)
59            raise e
60
61        # 执行升级
62        try:
63            return_code = self.upgrade()
64            if not return_code:
65                CONSTANT.ENVERRMESSAGE = "board upgrade fail"
66                logger.printLog(CONSTANT.ENVERRMESSAGE)
67                return False
68            if return_code == 98:
69                return 98
70            if return_code == 99:
71                return 99
72            return True
73        except Exception as e:
74            logger.error(e)
75            raise e
76
77    @dec_stepmsg("upgrade")
78    @timeout(3600)
79    def upgrade(self):
80        '''
81        #===================================================================================
82        #   @Method:        upgrade(self)
83        #   @Precondition:  none
84        #   @Func:          升级相关业务逻辑
85        #   @PostStatus:    none
86        #   @eg:            upgrade()
87        #   @return:        True or Flase
88        #===================================================================================
89        '''
90        global local_image_path, loader_tool_path, sn, LocationID ,test_num
91        upgrade_test_type = self.params_dict.get("UpgradeTestType")
92        sn = self.params_dict.get("sn")
93        LocationID = self.params_dict.get("LocationID")
94        test_num = self.params_dict.get("test_num")
95        pr_url = self.params_dict.get("pr_url")
96        logFilePath = self.logFilePath
97        logger.info(logFilePath)
98        r = logFilePath.rfind("\\")
99        report_path = logFilePath[:r]
100        logger.info(report_path)
101        scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))))
102        logger.info(scriptpath)
103        local_image_path = os.path.join(version_savepath)
104        logger.info(local_image_path)
105        loader_tool_path = os.path.join(scriptpath, "resource", "RK3568_tool", "upgrade_tool.exe")
106        logger.info(loader_tool_path)
107        mini_path = os.path.join(local_image_path, "mini_system_test", "L2_mini_system_test.py")
108        archive_path = os.path.join(version_savepath)
109        if not self.check_devices_mode():
110            check_devices_cmd = "hdc_std list targets"
111            f = send_times(check_devices_cmd)
112            logger.info(f)
113            if not f or "Empty" in f:
114                logger.error("No devices found,please check the device.")
115                return False
116            else:
117                logger.info("3568 board is connected.")
118                return self.check_devices_mode()
119        else:
120            # 下載鏡像
121            upgrde_loader_cmd = "%s -s %s UL %s/MiniLoaderAll.bin -noreset" % (loader_tool_path, LocationID, local_image_path)
122            h = sendCmd(upgrde_loader_cmd)
123            logger.info(h)
124            if "Upgrade loader ok" not in h:
125                logger.error("Download MiniLoaderAll.bin Fail!")
126                return False
127            else:
128                logger.printLog("Download MiniLoaderAll.bin Success!")
129                # time.sleep(3)
130                write_gpt_cmd = "%s -s %s DI -p %s/parameter.txt" % (loader_tool_path, LocationID, local_image_path)
131                j = sendCmd(write_gpt_cmd)
132                logger.info(j)
133                if "Write gpt ok" not in j:
134                    logger.error("Failed to execute the parameter.txt")
135                    return False
136                else:
137                    logger.printLog("Successfully executed parameter.txt.")
138                    # time.sleep(5)
139                    download_uboot_cmd = "%s -s %s DI -uboot %s/uboot.img %s/parameter.txt" % (
140                        loader_tool_path, LocationID, local_image_path, local_image_path)
141                    k = sendCmd(download_uboot_cmd)
142                    logger.info(k)
143                    if "Download image ok" not in k:
144                        logger.error("Failed to download the uboot.image!")
145                        if self.check_devices_mode():
146                            return 98
147                        return False
148                    else:
149                        logger.printLog("The uboot.image downloaded successfully!")
150                        # time.sleep(5)
151                        if not self.flash_version():
152                            return False
153                        reboot_devices_cmd = "%s -s %s RD" % (loader_tool_path, LocationID)
154                        reboot_result = sendCmd(reboot_devices_cmd)
155                        logger.info(reboot_result)
156                        time.sleep(30)
157                        try:
158                            if upgrade_test_type != "mini_system_test":
159                                if not start_cmd(sn):
160                                    if self.check_devices_mode():
161                                        return 98
162                                    return False
163                        except Exception as t:
164                            logger.info(t)
165                            if self.check_devices_mode():
166                                return 98
167                            return False
168                        time.sleep(10)
169                        if "Reset Device OK" not in reboot_result:
170                            logger.error("Failed to reboot the board!")
171                            return False
172                        else:
173                            logger.info("Reboot successfully!")
174                            logger.printLog("******下载完成,升级成功,开始进行冒烟测试******")
175                            if upgrade_test_type == "null":
176                                return True
177                            screenshot_path = os.path.join(local_image_path, "screenshot")
178                            resource_path = os.path.join(screenshot_path, "resource")
179                            logger.info(resource_path)
180                            py_path = os.path.join(resource_path, "capturescreentest.py")
181                            new_report_path = os.path.join(report_path, "result")
182                            logger.info(new_report_path)
183                            time_sleep = random.randint(1, 5)
184                            time.sleep(time_sleep)
185                            try:
186                                if not os.path.exists(new_report_path):
187                                    os.mkdir(new_report_path)
188                            except Exception as e:
189                                logger.error(e)
190                                return 98
191                            if upgrade_test_type == "mini_system_test":
192                                save_path = os.path.join(new_report_path)
193                                if exec_cmd(mini_path, sn, save_path, archive_path) == 98:
194                                    return 98
195                                return True
196                            if not upgrade_test_type or upgrade_test_type == "smoke_test":
197                                test_return = cmd_test(screenshot_path, py_path, new_report_path, resource_path, sn, test_num, pr_url)
198                                if test_return == 1:
199                                    return True
200                                if test_return == 98:
201                                    return 98
202                                if test_return == 99:
203                                    return 99
204                                else:
205                                    return False
206
207    @timeout(1000)
208    def flash_version(self):
209        partList = ["boot_linux", "system", "vendor", "userdata", "resource", "ramdisk", "chipset", "sys-prod", "chip-prod"]
210        for i in partList:
211            if not os.path.exists("%s/%s.img" % (local_image_path, i)):
212                logger.printLog("%s.img is not exist, ignore" % i)
213                continue
214            loadcmd = "%s -s %s DI -%s %s/%s.img" % (loader_tool_path, LocationID, i, local_image_path, i)
215            p = sendCmd(loadcmd)
216            logger.info(p)
217            # time.sleep(5)
218            if "Download image ok" not in p:
219                logger.info("try download %s again!" % i)
220                time.sleep(1)
221                second_cmd = "%s -s %s DI -%s %s/%s.img" % (loader_tool_path, LocationID, i, local_image_path, i)
222                f = sendCmd(second_cmd)
223                logger.info(f)
224                if "Download image ok" not in f:
225                    logger.printLog("Failed to download the %s.img!" % i)
226                    if self.check_devices_mode():
227                        return 98
228                    else:
229                        return False
230                return True
231            else:
232                logger.printLog("The %s.img downloaded successfully!" % i)
233        return True
234
235    @timeout(120)
236    def check_devices_mode(self):
237        check_times = 0
238        while check_times < 5:
239            check_mode_cmd = "%s LD" % loader_tool_path
240            g = sendCmd(check_mode_cmd)
241            logger.info(g)
242            #time.sleep(40)
243            if "LocationID=%s	Mode=Loader" % LocationID in g:
244                logger.info("3568 board has entered the Loader mode successfully!")
245                return True
246            else:
247                #if test_num != "2/2":
248                #    hdc_kill()
249                os.system("hdc_std -t %s shell reboot loader" % sn)
250                time.sleep(5)
251                check_times += 1
252        logger.error("Failed to enter the loader mode!")
253        return False
254
255    @dec_stepmsg("download")
256    @timeout(360)
257    def download(self):
258        '''
259        #===================================================================================
260        #   @Method:        download(self)
261        #   @Precondition:  none
262        #   @Func:          构建下载到本地的路径,执行相应包的下载
263        #   @PostStatus:    none
264        #   @eg:            download()
265        #   @return:        True or Flase
266        #===================================================================================
267        '''
268        global version_savepath, version_name
269        dir_path = CONSTANT.Path.getDirPath()
270        if self.params_dict.get("pbiid"):
271            version_path = self.params_dict.get("pbiid")
272            version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT"))
273            version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name)
274        else:
275            version_path = self.params_dict.get("upgrade_upgradeLocation")
276            version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation"))))
277            version_savepath = os.path.join(dir_path, version_name, "img")
278            # 执行img下载
279
280        if self.params_dict.get("isDownload") == "True":
281            logger.printLog("不需要做下载,直接返回")
282            return True
283
284        import hashlib
285        save_file_str = version_path.replace("/", "").replace("\\", "")
286        save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest()
287        logger.info("download hash string:%s, hash value:%s" % (save_file_str, save_file_name))
288        save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt"))
289        if not self.excutedown(version_path, version_savepath, save_path_file, False):
290            logger.error("download img fail")
291            return 98
292
293        # 保存本地版本路径给devicetest去版本路径下取用例
294        saveVersion(save_path_file, version_savepath)
295        return True
296
297    def excutedown(self, source_path, download_dir, suc_mark, is_file):
298        '''
299        #===================================================================================
300        #   @Method:        excutedown(source_path, download_dir, suc_mark, is_file)
301        #   @Precondition:  none
302        #   @Func:          执行下载动作
303        #   @PostStatus:    none
304        #   @Param:         source_path:资源文件路径
305        #                   download_dir:文件下载到本地的文件夹路径
306        #                   is_file:是否是文件
307        #   @eg:            excutedown("xxxx", "D:\\local\\image", suc_mark, Flase)
308        #   @return:        True or Flase
309        #===================================================================================
310        '''
311        failed_mark = os.path.join(download_dir, failed_file)
312        lock_path = os.path.join(download_dir, lock_suffix)
313        file_lock = FileLock()
314
315        if isDownLoadSuccess(download_dir, suc_mark, failed_mark):
316            return True
317        try:
318            nowtime = get_now_time_str_info()
319            logger.printLog("%s Downloading, please wait" % nowtime)
320            file_lock.lockFile(lock_path)
321            ret = ""
322            logger.info("Get lock. Start to ")
323            try:
324                if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True":
325                    ret = downloadByBitComet(source_path, download_dir, os_method)
326                elif source_path.startswith('\\\\'):
327                    ret = downloadByCopy(source_path, download_dir, is_file)
328                elif self.params_dict.get("pbiid"):
329                    ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT",
330                                                 self.params_dict.get("pbiid"))
331                elif source_path.startswith("http"):
332                    ret = run_download(source_path, download_dir)
333            except Exception as f:
334                logger.error(f)
335                logger.info("下载失败,间隔20秒,尝试再次下载。")
336                time.sleep(20)
337                ret = run_download(source_path, download_dir)
338            if source_path.endswith(".zip"):
339                zip_name = os.path.basename(source_path)
340                ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir)
341            if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)):
342                if source_path.startswith("http") and ("file_id=" in source_path):
343                    if source_path.endswith(".tar.gz"):
344                        zip_name = source_path.split('=')[-1]
345                    else:
346                        zip_name = "out.tar.gz"
347                else:
348                    zip_name = os.path.basename(source_path)
349                ret = unTarFile(os.path.join(download_dir, zip_name), download_dir)
350            nowtime = get_now_time_str_info()
351            logger.printLog("%s download to %s end" % (nowtime, download_dir))
352
353            if not ret:
354                with open(failed_mark, "a+") as fp:
355                    fp.write("")
356            return ret
357        except Exception as e:
358            logger.printLog(e)
359            #raise Exception(e)
360        finally:
361            file_lock.releaseFile()
362
363
364@timeout(30)
365def hdc_kill():
366    logger.info("kill the process")
367    os.system("hdc_std kill")
368    time.sleep(2)
369    logger.info("start the process")
370    os.system("hdc_std -l5 start")
371    # time.sleep(10)
372
373
374def sendCmd(mycmd):
375    result = "".join(os.popen(mycmd).readlines())
376    return result
377
378
379def send_times(mycmd):
380    times = 0
381    outcome = sendCmd(mycmd)
382    while times < 3:
383        if not outcome or "Empty" in outcome:
384            times += 1
385            time.sleep(3)
386        else:
387            time.sleep(3)
388            return outcome
389    return outcome
390
391
392@timeout(180)
393def start_cmd(sn):
394    try:
395        os.system("hdc_std -l5 start")
396        power_cmd = "hdc_std -t %s shell \"power-shell setmode 602\"" % sn
397        hilog_cmd = "hdc_std -t %s shell \"hilog -w start -l 400000000 -m none\"" % sn
398        logger.info(power_cmd)
399        logger.info(hilog_cmd)
400        power_result = sendCmd(power_cmd)
401        logger.info(power_result)
402        if not power_result:
403            return False
404        number = 0
405        while "Set Mode Success" not in power_result and number < 30:
406            time.sleep(4)
407            power_result = sendCmd(power_cmd)
408            logger.info(power_result)
409            number += 1
410            if number >= 20:
411                logger.error("Set mode failed")
412                return False
413        hilog_result = sendCmd(hilog_cmd)
414        logger.info(hilog_result)
415        return True
416    except Exception as e:
417        logger.error(e)
418        return False
419
420
421@timeout(3600)
422def cmd_test(screenshot_path, py_path, new_report_path, resource_path, sn, test_num, pr_url):
423    save_screenshot_path = os.path.join(new_report_path, "screenshot_result")
424    logger.info(save_screenshot_path)
425    time_sleep = random.randint(1, 5)
426    time.sleep(time_sleep)
427    try:
428        if not os.path.exists(save_screenshot_path):
429            os.mkdir(save_screenshot_path)
430        logger.info(save_screenshot_path)
431        base_screenshot_path = os.path.join(new_report_path, "screenshot_base")
432        if not os.path.exists(base_screenshot_path):
433            os.mkdir(base_screenshot_path)
434        logger.info(base_screenshot_path)
435    except Exception as e:
436        logger.error(e)
437        return 98
438    config_path = os.path.join(screenshot_path, "resource", "app_capture_screen_test_config.json")
439    py_cmd = "python %s --config %s --anwser_path %s --save_path %s --device_num %s --test_num %s --tools_path %s --pr_url %s" \
440             % (py_path, config_path, resource_path, save_screenshot_path, sn, test_num, screenshot_path, pr_url)
441    result = outCmd(py_cmd, save_screenshot_path, base_screenshot_path, resource_path)
442    if result == 1:
443        return True
444    if result == 98:
445        return 98
446    if result == 99:
447        return 99
448    else:
449        return False
450
451
452@timeout(3600)
453def outCmd(cmd, save_screenshot_path, base_screenshot_path, resource_path):
454    logger.info("cmd is: %s" % cmd)
455    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="gbk")
456    curline = p.stdout.readline()
457    list_png_name = []
458    try:
459        while "End of check" not in curline:
460            curline = p.stdout.readline()
461            logger.info(curline)
462            if "abnarmal" in curline:
463                png_name = curline.split(" ")[3].split(".")[0]
464                list_png_name.append(png_name)
465            if "SmokeTest find some fatal problems" in curline:
466                logger.error("SmokeTest find some fatal problems!")
467                return 99
468    except Exception as e:
469        logger.error(e)
470        logger.error("execute smoke_test.py failed!")
471        return 99
472    l = list(set(list_png_name))
473    if l:
474        logger.error(l)
475    try:
476        for i in l:
477            result = os.path.join(resource_path, "%s.png" % i)
478            base = os.path.join(base_screenshot_path, "%s.png" % i)
479            shutil.copy(result, base)
480    except Exception as t:
481        logger.info(t)
482    p.wait()
483    logger.info("p.returncode %s" % p.returncode)
484    if p.returncode == 0:
485        logger.info("screenshot check is ok!")
486        return True
487    if p.returncode == 101:
488        logger.error("device disconnection, please check the device!")
489        return False
490    logger.error("screenshot test failed, check the %s" % save_screenshot_path)
491    return 98
492
493
494@timeout(1000)
495def exec_cmd(mini_path, sn, save_path, archive_path):
496    cmd = "python %s --device_num %s --save_path %s --archive_path %s" % (mini_path, sn, save_path, archive_path)
497    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="gbk")
498    curline = p.stdout.readline()
499    try:
500        while "End of check" not in curline:
501            curline = p.stdout.readline()
502            logger.info(curline)
503    except Exception as e:
504        logger.error(e)
505    p.wait()
506    logger.info("p.returncode %s" % p.returncode)
507    if p.returncode == 0:
508        logger.info("mini_system_test is ok!")
509        return True
510    logger.error("mini_system_test failed!")
511    return 98