1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import time 20import os 21import sys 22 23from core.constants import SchedulerType 24from xdevice import Plugin 25from xdevice import get_plugin 26from xdevice import platform_logger 27from xdevice import Scheduler 28from xdevice import DeviceTestType 29from core.utils import get_build_output_path 30from core.utils import scan_support_product 31from core.utils import is_lite_product 32from core.common import is_open_source_product 33from core.command.parameter import Parameter 34from core.testcase.testcase_manager import TestCaseManager 35from core.config.config_manager import UserConfigManager 36from core.config.parse_parts_config import ParsePartsConfig 37 38LOG = platform_logger("Run") 39 40 41class Run(object): 42 def process_command_run(self, command, options): 43 para = Parameter() 44 test_type_list = para.get_testtype_list(options.testtype) 45 if len(test_type_list) == 0: 46 LOG.error("The testtype parameter is incorrect.") 47 return 48 options.testtype = test_type_list 49 50 parser = ParsePartsConfig(options.productform) 51 partname_list = parser.get_part_list( 52 options.subsystem, 53 options.testpart) 54 options.partname_list = partname_list 55 options.coverage_outpath = self.get_coverage_outpath(options) 56 57 LOG.info("") 58 LOG.info("------------------------------------") 59 LOG.info("Input parameter:") 60 LOG.info("productform = %s" % options.productform) 61 LOG.info("testtype = %s" % str(options.testtype)) 62 LOG.info("subsystem = %s" % str(options.subsystem)) 63 LOG.info("testpart = %s" % str(options.testpart)) 64 LOG.info("testmodule = %s" % options.testmodule) 65 LOG.info("testsuit = %s" % options.testsuit) 66 LOG.info("testcase = %s" % options.testcase) 67 LOG.info("testlevel = %s" % options.testlevel) 68 LOG.info("partname_list = %s" % str(options.partname_list)) 69 LOG.info("------------------------------------") 70 LOG.info("") 71 72 if not para.check_run_parameter(options): 73 LOG.error("Input parameter is incorrect.") 74 return 75 76 if not self._build_test_cases(options): 77 LOG.error("Build test cases failed.") 78 return 79 80 test_case_path = self.get_tests_out_path(options.productform) 81 if not os.path.exists(test_case_path): 82 LOG.error("%s is not exist." % test_case_path) 83 return 84 85 test_dict = TestCaseManager().get_test_files(test_case_path, options) 86 if not self._check_test_dictionary(test_dict): 87 LOG.error("The test file list is empty.") 88 return 89 90 if ("distributedtest" in options.testtype and 91 len(options.testtype) == 1): 92 from core.command.distribute_utils \ 93 import check_ditributetest_environment 94 from core.command.distribute_utils import make_device_info_file 95 from core.command.distribute_utils \ 96 import execute_distribute_test_file 97 from core.command.distribute_utils import make_reports 98 99 local_time = time.localtime() 100 create_time = time.strftime('%Y-%m-%d-%H-%M-%S', local_time) 101 start_time = time.strftime('%Y-%m-%d %H:%M:%S', local_time) 102 103 if not check_ditributetest_environment(): 104 return 105 106 result_rootpath = os.path.join(sys.framework_root_dir, 107 "reports", 108 create_time) 109 print(result_rootpath) 110 111 log_path = os.path.join(result_rootpath, "log") 112 tmp_path = os.path.join(result_rootpath, "temp") 113 os.makedirs(log_path, exist_ok=True) 114 os.makedirs(tmp_path, exist_ok=True) 115 116 Scheduler.start_task_log(log_path) 117 make_device_info_file(tmp_path) 118 119 pyfile_list = test_dict["PYT"] 120 for index, element in enumerate(pyfile_list): 121 LOG.info("[%s / %s] Executing: %s" % (index + 1, 122 len(pyfile_list), element)) 123 execute_distribute_test_file(element, result_rootpath) 124 125 make_reports(result_rootpath, start_time) 126 Scheduler.stop_task_log() 127 else: 128 options.testdict = test_dict 129 options.target_outpath = self.get_target_out_path( 130 options.productform) 131 132 scheduler = get_plugin(plugin_type=Plugin.SCHEDULER, 133 plugin_id=SchedulerType.SCHEDULER)[0] 134 if scheduler is None: 135 LOG.error("Can not find the scheduler plugin.") 136 else: 137 if is_lite_product(options.productform, 138 sys.source_code_root_path): 139 options.testcases_path = options.target_outpath 140 options.resource_path = os.path.abspath(os.path.join( 141 sys.framework_root_dir, "..", "resource")) 142 print(options.testcases_path) 143 print(options.resource_path) 144 if options.productform.find("wifiiot") != -1: 145 scheduler.update_test_type_in_source(".bin", 146 DeviceTestType.ctest_lite) 147 scheduler.update_ext_type_in_source("BIN", 148 DeviceTestType.ctest_lite) 149 else: 150 print("productform is not wifiiot") 151 scheduler.exec_command(command, options) 152 return 153 154 ############################################################## 155 ############################################################## 156 157 @classmethod 158 def get_target_out_path(cls, product_form): 159 target_out_path = UserConfigManager().get_test_cases_dir() 160 if target_out_path == "": 161 target_out_path = os.path.join( 162 get_build_output_path(product_form), 163 "packages", 164 product_form) 165 target_out_path = os.path.abspath(target_out_path) 166 return target_out_path 167 168 @classmethod 169 def _build_test_cases(cls, options): 170 if options.coverage: 171 LOG.info("Coverage testing, no need to compile testcases") 172 return True 173 174 is_build_testcase = UserConfigManager().get_user_config_flag( 175 "build", "testcase") 176 project_root_path = sys.source_code_root_path 177 if is_build_testcase and project_root_path != "": 178 from core.build.build_manager import BuildManager 179 build_manager = BuildManager() 180 return build_manager.build_testcases(project_root_path, options) 181 else: 182 return True 183 184 @classmethod 185 def _check_test_dictionary(cls, test_dictionary): 186 is_valid_status = False 187 key_list = sorted(test_dictionary.keys()) 188 for key in key_list: 189 file_list = test_dictionary[key] 190 if len(file_list) > 0: 191 is_valid_status = True 192 break 193 return is_valid_status 194 195 @classmethod 196 def get_tests_out_path(cls, product_form): 197 testcase_path = UserConfigManager().get_test_cases_dir() 198 if testcase_path == "": 199 all_product_list = scan_support_product() 200 if product_form in all_product_list: 201 if is_open_source_product(product_form): 202 testcase_path = os.path.abspath(os.path.join( 203 get_build_output_path(product_form), 204 "packages", 205 "phone", 206 "tests")) 207 else: 208 testcase_path = os.path.abspath(os.path.join( 209 get_build_output_path(product_form), 210 "packages", 211 product_form, 212 "tests")) 213 else: 214 testcase_path = os.path.join( 215 get_build_output_path(product_form), "test") 216 LOG.info("testcase_path=%s" % testcase_path) 217 return testcase_path 218 219 @classmethod 220 def get_coverage_outpath(cls, options): 221 coverage_out_path = "" 222 if options.coverage: 223 coverage_out_path = get_build_output_path(options.productform) 224 if coverage_out_path == "": 225 coverage_out_path = UserConfigManager().get_user_config( 226 "coverage").get("outpath", "") 227 if coverage_out_path == "": 228 LOG.error("Coverage test: coverage_outpath is empty.") 229 return coverage_out_path 230 231 232