1#!/usr/bin/python3.4 2# coding=utf-8 3# 4 5# Copyright (C) 2016 Huawei Technologies Co., HUTAF xDevice 6# 7# Licensed under the Apache License, Version 2.0 (the "License"); you may not 8# use this file except in compliance with the License. You may obtain a copy of 9# the License at 10# 11# http://www.apache.org/licenses/LICENSE-2.0 12# 13# Unless required by applicable law or agreed to in writing, software 14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 16# License for the specific language governing permissions and limitations under 17# the License. 18 19import copy 20import sys 21import os 22import traceback 23 24from xdevice import ConfigConst 25from xdevice import get_cst_time 26from xdevice import get_file_absolute_path 27from xdevice import LifeStage 28from xdevice import Scheduler 29 30from devicetest.core.constants import RunStatus 31from devicetest.core.constants import RunResult 32from devicetest.core.constants import FileAttribute 33from devicetest.core.variables import CurCase 34from devicetest.core.variables import DeccVariable 35from devicetest.core.variables import ProjectVariables 36from devicetest.log.logger import DeviceTestLog 37from devicetest.report.generation import add_log_caching_handler 38from devicetest.report.generation import del_log_caching_handler 39from devicetest.report.generation import get_caching_logs 40from devicetest.report.generation import generate_report 41from devicetest.utils.util import calculate_execution_time 42from devicetest.utils.util import get_base_name 43from devicetest.utils.util import get_dir_path 44from devicetest.utils.util import import_from_file 45 46suite_flag = None 47 48 49class TestSuite(object): 50 """Base class for all test classes to inherit from. 51 52 This class gets all the controller objects from test_runner and executes 53 the test cases requested within itself. 54 55 """ 56 57 def __init__(self, configs, path): 58 self.configs = configs 59 self.devices = [] 60 self.device1 = None 61 self.device2 = None 62 # 透传的参数 63 self.pass_through = None 64 self.set_devices(self.configs["devices"]) 65 self.path = path 66 self.log = self.configs["log"] 67 self.error_msg = '' 68 self.case_list = [] 69 self.case_result = dict() 70 self.suite_name = self.configs.get("suite_name") 71 # 白名单用例 72 self.white_case_list = [] 73 # 黑名单用例 74 self.black_case_list = [] 75 # 初始化透传参数的列表 76 self.arg_list = dict() 77 self.app_result_info = dict() 78 self._test_args_para_parse(self.configs["testargs"]) 79 # 往DeviceTest的用例中注入logger并防止重复初始化测试套级别的变量 80 self.inject_logger = None 81 self.cur_case = None 82 # device log 83 self.device_log = dict() 84 self.hilog = dict() 85 self.log_proc = dict() 86 self.hilog_proc = dict() 87 88 self.suite_case_results = [] 89 self.suite_report_path = "" 90 self._case_log_buffer_hdl = None 91 92 # device录屏截图属性 93 self.devices_media = dict() 94 95 def __enter__(self): 96 return self 97 98 def __exit__(self, *args): 99 pass 100 101 def _device_close(self): 102 self.log.debug("Start device close") 103 for device in self.devices: 104 device.close() 105 self.log.debug("Finish device close.") 106 107 def run(self): 108 self._init_devicetest() 109 start_time = get_cst_time() 110 # 开始收集测试套(setup和teardown)的运行日志 111 suite_log_buffer_hdl = add_log_caching_handler() 112 try: 113 self.cur_case.set_suite_instance(self) 114 # 1.先判断是否在json中指定,否则先收集当前文件夹下所有testcase得到run_list 115 self.case_list = self._get_case_list(self.path) 116 self.log.debug("Execute test case list: {}".format(self.case_list)) 117 # 2.先执行self.setup 118 if RunStatus.FINISHED == self.run_setup(): 119 return 120 121 # 在运行测试套子用例前,停止收集测试套(setup和teardown)的运行日志 122 del_log_caching_handler(suite_log_buffer_hdl) 123 # 3.依次执行所有的run_list 124 # 开始收集测试套子用例的运行日志 125 self._case_log_buffer_hdl = add_log_caching_handler() 126 total_case_num = len(self.case_list) 127 case_num = 1 128 for case in self.case_list: 129 self.log.info("[{} / {}] Executing suite case: {}".format(case_num, total_case_num, case)) 130 self.run_one_test_case(case) 131 case_num = case_num + 1 132 133 # 停止收集测试套子用例的运行日志 134 del_log_caching_handler(self._case_log_buffer_hdl) 135 self._case_log_buffer_hdl = None 136 # 在运行测试套子用例前,重新开始收集测试套(setup和teardown)的运行日志 137 add_log_caching_handler(buffer_hdl=suite_log_buffer_hdl) 138 finally: 139 # 4.执行self.teardown 140 self.run_teardown() 141 self.cur_case.set_suite_instance(None) 142 143 steps = self.cur_case.get_steps_info() 144 self.log.info(f"test suite steps: {steps}") 145 # 停止收集测试套(setup和teardown)的运行日志 146 del_log_caching_handler(suite_log_buffer_hdl) 147 if suite_log_buffer_hdl is None: 148 return 149 # 生成测试套的报告 150 self.log.info("generate suite report") 151 end_time = get_cst_time() 152 suite_info = { 153 "name": self.suite_name, 154 "result": "", 155 "begin": start_time.strftime("%Y-%m-%d %H:%M:%S"), 156 "end": end_time.strftime("%Y-%m-%d %H:%M:%S"), 157 'elapsed': calculate_execution_time(start_time, end_time), 158 "error": "", 159 "logs": get_caching_logs(suite_log_buffer_hdl), 160 "cases": self.suite_case_results 161 } 162 report_path = os.path.join("details", self.suite_name, self.suite_name + ".html") 163 to_file = os.path.join(self.get_case_report_path(), report_path) 164 generate_report(to_file, template="suite.html", suite=suite_info) 165 del suite_log_buffer_hdl 166 167 # 往结果xml添加测试套的报告路径 168 self.suite_report_path = report_path 169 steps.clear() 170 DeccVariable.reset() 171 172 def setup(self): 173 """Setup function that will be called before executing any test suite. 174 Implementation is optional. 175 """ 176 pass 177 178 def setup_start(self): 179 """ 180 setup_start function that will be called after setup function. 181 Implementation is optional. 182 """ 183 pass 184 185 def setup_end(self): 186 """ 187 setup_end function that will be called after setup function. 188 Implementation is optional. 189 """ 190 pass 191 192 def teardown(self): 193 """Teardown function that will be called after all the selected test 194 suite. 195 Implementation is optional. 196 """ 197 pass 198 199 def teardown_start(self): 200 """ 201 teardown_start function that will be called before Teardown function. 202 Implementation is optional. 203 """ 204 pass 205 206 def teardown_end(self): 207 """ 208 teardown_end function that will be called after Teardown function. 209 Implementation is optional. 210 """ 211 pass 212 213 def get_params(self): 214 return self.arg_list 215 216 def set_devices(self, devices): 217 self.devices = devices 218 if not devices: 219 return 220 221 try: 222 num = 1 223 224 for _, _ad in enumerate(self.devices): 225 if not hasattr(_ad, "device_id"): 226 setattr(_ad, "device_id", "device{}".format(num)) 227 # 兼容release2 增加id、serial 228 setattr(_ad, "id", _ad.device_id) 229 setattr(_ad, "serial", _ad.device_sn) 230 setattr(self, _ad.device_id, _ad) 231 setattr(self, "device{}".format(num), _ad) 232 num += 1 233 except Exception as error: 234 self.log.error("Failed to initialize the device object in the " 235 "TestCase.", error_no="01218") 236 raise error 237 238 def _get_case_list(self, path): 239 result = [] 240 if len(self.configs["suitecases"]) > 0: 241 for _, case in enumerate(self.configs["suitecases"]): 242 if os.path.exists(case): 243 case_path = case 244 else: 245 case_path = get_file_absolute_path(case, [path, 246 self.configs["resource_path"], 247 self.configs["testcases_path"]]) 248 result.append(case_path) 249 else: 250 all_file_list = os.listdir(path) 251 # 遍历该文件夹下的所有目录或者文件 252 for file in all_file_list: 253 filepath = os.path.join(path, file) 254 # 如果是文件夹,递归调用函数 255 if os.path.isdir(filepath): 256 result.extend(self._get_case_list(filepath)) 257 # 如果不是文件夹,保存文件路径及文件名 258 elif os.path.isfile(filepath) and \ 259 "__pycache__" not in filepath: 260 if file.startswith(FileAttribute.TESTCASE_PREFIX) and \ 261 (file.endswith(FileAttribute.TESTCASE_POSFIX_PY) or 262 file.endswith(FileAttribute.TESTCASE_POSFIX_PYC) or 263 file.endswith(FileAttribute.TESTCASE_POSFIX_PYD)): 264 result.append(filepath) 265 return result 266 267 def _exec_func(self, func, *args): 268 result = False 269 try: 270 func(*args) 271 except Exception as exception: 272 self.error_msg = "{}:{}".format(str(exception), traceback.format_exc()) 273 self.log.error("run case error! Exception: {}".format(self.error_msg)) 274 else: 275 result = True 276 return result 277 278 def run_setup(self): 279 self.setup_start() 280 self.log.info("**********SetUp Starts!") 281 ret = self._exec_func(self.setup) 282 self.log.info("**********SetUp Ends!") 283 if ret: 284 self.setup_end() 285 return RunStatus.RUNNING 286 self.log.info("SetUp Failed!") 287 return RunStatus.FINISHED 288 289 def run_one_test_case(self, case_path): 290 case_name = get_base_name(case_path) 291 if (self.black_case_list and case_name in self.black_case_list) \ 292 or (self.white_case_list and case_name not in self.white_case_list): 293 self.log.warning("case name {} is in black list or not in white list, ignored".format(case_name)) 294 return 295 start_time = get_cst_time() 296 case_result = RunResult.FAILED 297 error_msg = "" 298 test_cls_instance = None 299 try: 300 test_cls = import_from_file(get_dir_path(case_path), case_name) 301 self.log.info("Success to import {}.".format(case_name)) 302 self._compatible_testcase(case_path, case_name) 303 with test_cls(self.configs) as test_cls_instance: 304 self.cur_case.set_case_instance(test_cls_instance) 305 test_cls_instance.run() 306 case_result, error_msg = test_cls_instance.result, test_cls_instance.error_msg 307 except Exception as e: 308 self.log.debug(traceback.format_exc()) 309 self.log.error("run case error! Exception: {}".format(e)) 310 finally: 311 if test_cls_instance is None: 312 case_result = RunResult.BLOCKED 313 314 Scheduler.call_life_stage_action(stage=LifeStage.case_end, 315 case_name=case_name, 316 case_result=case_result) 317 318 end_time = get_cst_time() 319 cost = int(round((end_time - start_time).total_seconds() * 1000)) 320 self.case_result[case_name] = { 321 "result": case_result, "error": error_msg, 322 "run_time": cost, "report": ""} 323 self.log.info("Executed case: {}, result: {}, cost time: {}".format( 324 case_name, test_cls_instance.result, cost)) 325 if test_cls_instance: 326 try: 327 del test_cls_instance 328 self.log.debug("del test case instance success.") 329 except Exception as e: 330 self.log.debug(traceback.format_exc()) 331 self.log.warning("del test case instance exception." 332 " Exception: {}".format(e)) 333 self._device_close() 334 335 if self._case_log_buffer_hdl is None: 336 return 337 # 生成子用例的报告 338 end_time = get_cst_time() 339 extra_head = self.cur_case.get_steps_extra_head() 340 steps = self.cur_case.get_steps_info() 341 base_info = { 342 "name": case_name, 343 "result": case_result, 344 "begin": start_time.strftime("%Y-%m-%d %H:%M:%S"), 345 "end": end_time.strftime("%Y-%m-%d %H:%M:%S"), 346 'elapsed': calculate_execution_time(start_time, end_time), 347 "error": error_msg, 348 "extra_head": extra_head, 349 "steps": steps 350 } 351 case_info = copy.copy(base_info) 352 case_info["logs"] = copy.copy(get_caching_logs(self._case_log_buffer_hdl)) 353 case_html = case_name + ".html" 354 report_path = os.path.join("details", self.suite_name, case_html) 355 to_path = os.path.join(self.configs.get("report_path"), report_path) 356 generate_report(to_path, case=case_info) 357 base_info["report"] = case_html 358 self.suite_case_results.append(base_info) 359 # 清空日志缓存 360 self._case_log_buffer_hdl.buffer.clear() 361 steps.clear() 362 # 往结果xml添加子用例的报告路径 363 self.case_result[case_name]["report"] = report_path 364 # 将用例实例对象和用例名置为空 365 self.cur_case.set_case_instance(None) 366 self.cur_case.set_name("") 367 368 def run_teardown(self): 369 self.log.info("**********TearDown Starts!") 370 self.teardown_start() 371 self._exec_func(self.teardown) 372 self.teardown_end() 373 self.log.info("**********TearDown Ends!") 374 375 def _test_args_para_parse(self, paras): 376 paras = dict(paras) 377 for para_name in paras.keys(): 378 para_name = para_name.strip() 379 para_values = paras.get(para_name, []) 380 if para_name == "class": 381 self.white_case_list.extend(para_values) 382 elif para_name == "notClass": 383 self.black_case_list.extend(para_values) 384 elif para_name == "para": 385 for arg in para_values: 386 key, value = arg.split("#") 387 self.arg_list[key] = value 388 elif para_name == "deveco_planet_info": 389 for app_info in para_values: 390 key, value = app_info.split("#") 391 if key == "task_type": 392 setattr(sys, "category", value) 393 else: 394 self.app_result_info[key] = value 395 setattr(sys, "app_result_info", self.app_result_info) 396 elif para_name == ConfigConst.pass_through: 397 self.pass_through = para_values 398 else: 399 continue 400 401 self.configs["pass_through"] = self.pass_through 402 self.configs["arg_list"] = self.arg_list 403 404 def get_case_report_path(self): 405 return self.configs["report_path"] 406 407 def _compatible_testcase(self, case_path, case_name): 408 DeccVariable.cur_case().set_name(case_name) 409 project_var = ProjectVariables(self.inject_logger) 410 project_var.execute_case_name = case_name 411 project_var.cur_case_full_path = case_path 412 project_var.task_report_dir = self.get_case_report_path() 413 self.configs["project"] = project_var 414 415 def _init_devicetest(self): 416 DeviceTestLog.set_log(self.log) 417 self.cur_case = CurCase(self.log) 418 self.cur_case.suite_name = self.suite_name 419 self.cur_case.set_case_screenshot_dir(None, self.get_case_report_path(), None) 420 DeccVariable.set_cur_case_obj(self.cur_case) 421