#!/usr/bin/python3.4
# coding=utf-8
#
# Copyright (C) 2016 Huawei Technologies Co., HUTAF xDevice
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import copy
import sys
import os
import traceback
from typing import List, Tuple
from xdevice import ConfigConst
from xdevice import calculate_elapsed_time
from xdevice import get_cst_time
from xdevice import get_file_absolute_path
from xdevice import FilePermission
from xdevice import CaseEnd
from xdevice import Binder
from xdevice import Variables
from devicetest.core.constants import RunResult
from devicetest.core.constants import FileAttribute
from devicetest.core.test_case import UpdateStep
from devicetest.core.variables import CurCase
from devicetest.core.variables import DeccVariable
from devicetest.core.variables import ProjectVariables
from devicetest.error import ErrorMessage
from devicetest.report.generation import add_log_caching_handler
from devicetest.report.generation import del_log_caching_handler
from devicetest.report.generation import get_caching_logs
from devicetest.report.generation import generate_report
from devicetest.utils.util import get_base_name
from devicetest.utils.util import get_dir_path
from devicetest.utils.util import import_from_file
class TestSuite:
"""Base class for all test classes to inherit from.
This class gets all the controller objects from test_runner and executes
the test cases requested within itself.
"""
def __init__(self, configs, path):
self.configs = configs
self.devices = []
self.device1 = None
self.device2 = None
# 透传的参数
self.pass_through = Variables.config.pass_through
self.set_devices(self.configs["devices"])
self.path = path
self.log = self.configs["log"]
self.error_msg = ''
self.trace_info = ''
self.case_list: List[Tuple[str, str]] = []
self.case_result = dict()
self.suite_name = self.configs.get("suite_name")
# 白名单用例
self.white_case_list = []
# 黑名单用例
self.black_case_list = []
# 初始化透传参数的列表
self.arg_list = dict()
self.app_result_info = dict()
self._test_args_para_parse(self.configs["testargs"])
# 往DeviceTest的用例中注入logger并防止重复初始化测试套级别的变量
self.inject_logger = None
self.cur_case = None
# device log
self.device_log = dict()
self.hilog = dict()
self.log_proc = dict()
self.hilog_proc = dict()
self.suite_case_results = []
self.suite_report_path = ""
self._case_log_buffer_hdl = None
# device录屏截图属性
self.devices_media = dict()
self._repeat = self.configs.get("request").config.repeat
self._repeat_round = self.configs.get("request").get_repeat_round()
self._round_folder = f"round{self._repeat_round}" if self._repeat > 1 else ""
def __enter__(self):
return self
def __exit__(self, *args):
pass
def _device_close(self):
self.log.debug("Start device close")
for device in self.devices:
device.close()
self.log.debug("Finish device close.")
def run(self):
self._init_devicetest()
report_path = os.path.join("details", self._round_folder, self.suite_name, self.suite_name + ".html")
start_time = get_cst_time()
# 记录录屏和截图属性
# self._get_screenrecorder_and_screenshot()
# 开始收集测试套(setup和teardown)的运行日志
suite_log_buffer_hdl = add_log_caching_handler()
try:
self.cur_case.set_suite_instance(self)
# 1.先判断是否在json中指定,否则先收集当前文件夹下所有testcase得到run_list
for case_path in self._get_case_list(self.path):
case_name = get_base_name(case_path)
if (self.black_case_list and case_name in self.black_case_list) \
or (self.white_case_list and case_name not in self.white_case_list):
self.log.warning("case name {} is in black list or not in white list, ignored".format(case_name))
continue
self.case_list.append((case_name, case_path))
self.log.debug("Execute test case list: {}".format(self.case_list))
# 2.先执行self.setup
if self.run_setup():
# 在运行测试套子用例前,停止收集测试套setup步骤的运行日志
del_log_caching_handler(suite_log_buffer_hdl)
# 3.依次执行所有的run_list
# 开始收集测试套子用例的运行日志
self._case_log_buffer_hdl = add_log_caching_handler()
total_case_num = len(self.case_list)
for index, case in enumerate(self.case_list, 1):
# self._reset_screenrecorder_and_screenshot()
self.log.info("[{} / {}] Executing suite case: {}".format(index, total_case_num, case[1]))
self.run_one_test_case(case)
# 停止收集测试套子用例的运行日志
del_log_caching_handler(self._case_log_buffer_hdl)
else:
self.error_msg = ErrorMessage.TestCase.Code_0203017.format(self.error_msg)
for case in self.case_list:
self.case_result[case[0]] = {
"result": RunResult.BLOCKED,
"error": self.error_msg,
"run_time": 0,
"report": report_path
}
self._case_log_buffer_hdl = None
# 在运行测试套子用例后,重新开始收集测试套teardown步骤的运行日志
add_log_caching_handler(buffer_hdl=suite_log_buffer_hdl)
finally:
# 4.执行self.teardown
self.run_teardown()
self.cur_case.set_suite_instance(None)
steps = self.cur_case.get_steps_info()
# 停止收集测试套(setup和teardown)的运行日志
del_log_caching_handler(suite_log_buffer_hdl)
if suite_log_buffer_hdl is None:
return
# 生成测试套的报告
self.log.info("generate suite report")
end_time = get_cst_time()
environment = self.configs.get("request").config.environment
suite_info = {
"name": self.suite_name,
"result": "",
"begin": start_time.strftime("%Y-%m-%d %H:%M:%S"),
"end": end_time.strftime("%Y-%m-%d %H:%M:%S"),
'elapsed': calculate_elapsed_time(start_time, end_time),
"error": "",
"logs": "",
"subcases": self.suite_case_results,
"devices": [] if environment is None else environment.get_description(),
"steps": steps
}
log_content = {
"content": get_caching_logs(suite_log_buffer_hdl)
}
to_file = os.path.join(self.get_case_report_path(), report_path)
generate_report(to_file, case=suite_info, logs=log_content)
del suite_log_buffer_hdl
# 往结果xml添加测试套的报告路径
self.suite_report_path = report_path
steps.clear()
DeccVariable.reset()
def setup(self):
"""Setup function that will be called before executing any test suite.
Implementation is optional.
"""
pass
def setup_start(self):
"""
setup_start function that will be called after setup function.
Implementation is optional.
"""
pass
def setup_end(self):
"""
setup_end function that will be called after setup function.
Implementation is optional.
"""
pass
def teardown(self):
"""Teardown function that will be called after all the selected test
suite.
Implementation is optional.
"""
pass
def teardown_start(self):
"""
teardown_start function that will be called before Teardown function.
Implementation is optional.
"""
pass
def teardown_end(self):
"""
teardown_end function that will be called after Teardown function.
Implementation is optional.
"""
pass
def get_params(self):
return self.arg_list
def set_devices(self, devices):
self.devices = devices
if not devices:
return
try:
for num, _ad in enumerate(self.devices, 1):
if not hasattr(_ad, "device_id") or not getattr(_ad, "device_id"):
setattr(_ad, "device_id", "device{}".format(num))
# 兼容release2 增加id、serial
setattr(_ad, "id", _ad.device_id)
setattr(_ad, "serial", _ad.device_sn)
setattr(self, _ad.device_id, _ad)
setattr(self, "device{}".format(num), _ad)
except Exception as error:
self.log.error("Failed to initialize the device object in the "
"TestCase.", error_no="01218")
raise error
def _get_case_list(self, path):
result = []
if len(self.configs["suitecases"]) > 0:
for _, case in enumerate(self.configs["suitecases"]):
if os.path.exists(case):
case_path = case
else:
case_path = get_file_absolute_path(case, [path,
self.configs["resource_path"],
self.configs["testcases_path"]])
result.append(case_path)
else:
all_file_list = os.listdir(path)
# 遍历该文件夹下的所有目录或者文件
for file in all_file_list:
filepath = os.path.join(path, file)
# 如果是文件夹,递归调用函数
if os.path.isdir(filepath):
result.extend(self._get_case_list(filepath))
# 如果不是文件夹,保存文件路径及文件名
elif os.path.isfile(filepath) and \
"__pycache__" not in filepath:
if file.startswith(FileAttribute.TESTCASE_PREFIX) and \
(file.endswith(FileAttribute.TESTCASE_POSFIX_PY) or
file.endswith(FileAttribute.TESTCASE_POSFIX_PYC) or
file.endswith(FileAttribute.TESTCASE_POSFIX_PYD)):
result.append(filepath)
return result
def _exec_func(self, func, *args):
result = False
try:
func(*args)
except Exception as exception:
self.error_msg = str(exception)
self.trace_info = traceback.format_exc()
index = self.cur_case.step_index
if index == -1:
self.log.error(self.error_msg)
self.log.error(self.trace_info)
else:
step_error_id = f'step_error_{index}'
self.log.error(f'{self.error_msg}')
self.log.error(self.trace_info)
_error = f'{self.error_msg}'
UpdateStep(index, error=_error)
else:
result = True
return result
def run_setup(self):
self.setup_start()
self.log.info("**********SetUp Starts!")
ret = self._exec_func(self.setup)
self.log.info("**********SetUp Ends!")
if ret:
self.setup_end()
return True
self.log.info("SetUp Failed!")
return False
def run_one_test_case(self, case: Tuple[str, str]):
case_name, case_path = case[0], case[1]
start_time = get_cst_time()
case_result = RunResult.FAILED
test_cls_instance = None
result_content = None # 用例测试结果的拓展内容
try:
test_cls = import_from_file(get_dir_path(case_path), case_name)
self.log.info("Success to import {}.".format(case_name))
self._compatible_testcase(case_path, case_name)
with test_cls(self.configs) as test_cls_instance:
self.cur_case.set_case_instance(test_cls_instance)
test_cls_instance.run()
case_result, error_msg = test_cls_instance.result, test_cls_instance.error_msg
result_content = getattr(test_cls_instance, "result_content", None)
except Exception as e:
error_msg = str(e)
self.log.error("run case error! Exception: {}".format(e))
self.log.error(traceback.format_exc())
if test_cls_instance is None:
case_result = RunResult.BLOCKED
if test_cls_instance:
try:
del test_cls_instance
self.log.debug("del test case instance success")
except Exception as e:
self.log.debug(traceback.format_exc())
self.log.warning("del test case instance exception. Exception: {}".format(e))
Binder.notify_stage(CaseEnd(case_name, case_result))
end_time = get_cst_time()
cost = int(round((end_time - start_time).total_seconds() * 1000))
self.log.info("Executed case: {}, result: {}, cost time: {}ms".format(case_name, case_result, cost))
self.case_result[case_name] = {
"result": case_result, "error": error_msg,
"run_time": cost, "report": "", "result_content": result_content}
try:
self._device_close()
except Exception as e:
self.log.error("stop catch device log error! {}".format(e))
self.log.debug(traceback.format_exc())
if self._case_log_buffer_hdl is None:
return
# 生成子用例的报告
steps = self.cur_case.get_steps_info()
base_info = {
"name": case_name,
"result": case_result,
"begin": start_time.strftime("%Y-%m-%d %H:%M:%S"),
"end": end_time.strftime("%Y-%m-%d %H:%M:%S"),
'elapsed': calculate_elapsed_time(start_time, end_time),
"error": error_msg
}
case_info = copy.copy(base_info)
case_info.update({
"logs": "",
"devices": [],
"steps": steps
})
log_content = {
"content": copy.copy(get_caching_logs(self._case_log_buffer_hdl))
}
case_html = case_name + ".html"
report_path = os.path.join("details", self._round_folder, self.suite_name, case_html)
to_file = os.path.join(self.configs.get("report_path"), report_path)
generate_report(to_file, case=case_info, logs=log_content)
base_info["report"] = case_html
self.suite_case_results.append(base_info)
# 清空日志缓存
self._case_log_buffer_hdl.buffer.clear()
steps.clear()
# 往结果xml添加子用例的报告路径
self.case_result[case_name]["report"] = report_path
# 将用例实例对象和用例名置为空
self.cur_case.set_case_instance(None)
self.cur_case.set_name("")
def run_teardown(self):
self.log.info("**********TearDown Starts!")
self.teardown_start()
self._exec_func(self.teardown)
self.teardown_end()
self.log.info("**********TearDown Ends!")
def _test_args_para_parse(self, paras):
paras = dict(paras)
for para_name in paras.keys():
para_name = para_name.strip()
para_values = paras.get(para_name, [])
if para_name == "class":
self.white_case_list.extend(para_values)
elif para_name == "notClass":
self.black_case_list.extend(para_values)
elif para_name == "para":
for arg in para_values:
key, value = arg.split("#")
self.arg_list[key] = value
elif para_name == "deveco_planet_info":
for app_info in para_values:
key, value = app_info.split("#")
if key == "task_type":
setattr(sys, "category", value)
else:
self.app_result_info[key] = value
setattr(sys, "app_result_info", self.app_result_info)
else:
continue
self.configs["pass_through"] = self.pass_through
self.configs["arg_list"] = self.arg_list
def get_case_report_path(self):
return self.configs["report_path"]
def _compatible_testcase(self, case_path, case_name):
DeccVariable.cur_case().set_name(case_name)
project_var = ProjectVariables(self.inject_logger)
project_var.execute_case_name = case_name
project_var.cur_case_full_path = case_path
project_var.task_report_dir = self.get_case_report_path()
self.configs["project"] = project_var
def _init_devicetest(self):
self.cur_case = CurCase(self.log)
self.cur_case.suite_name = self.suite_name
self.cur_case.set_case_screenshot_dir(
None, self.get_case_report_path(), None,
repeat=self._repeat, repeat_round=self._repeat_round)
DeccVariable.set_cur_case_obj(self.cur_case)