1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import time 21 22from _core.constants import ConfigConst 23from _core.context.handler import report_not_executed 24from _core.context.option_util import get_device_options 25from _core.error import ErrorMessage 26from _core.exception import ParamError 27from _core.executor.source import TestSource 28from _core.logger import platform_logger 29from _core.plugin import Config 30from _core.plugin import Plugin 31from _core.plugin import get_plugin 32from _core.testkit.json_parser import JsonParser 33from _core.utils import get_kit_instances 34from _core.utils import get_repeat_round 35 36__all__ = ["Descriptor", "Task", "Request"] 37LOG = platform_logger("Request") 38 39 40class Descriptor: 41 """ 42 The descriptor for a test or suite 43 """ 44 45 def __init__(self, uuid=None, name=None, source=None, container=False, error=None): 46 self.unique_id = uuid 47 self.display_name = name 48 self.tags = {} 49 self.source = source 50 self.parent = None 51 self.children = [] 52 self.container = container 53 self.error = error 54 55 def get_container(self): 56 return self.container 57 58 def get_unique_id(self): 59 return self.unique_id 60 61 62class Task: 63 """ 64 TestTask describes the tree of tests and suites 65 """ 66 EMPTY_TASK = "empty" 67 TASK_CONFIG_SUFFIX = ".json" 68 TASK_CONFIG_DIR = "config" 69 life_stage_listener = None 70 71 def __init__(self, root=None, drivers=None, config=None): 72 self.root = root 73 self.test_drivers = drivers or [] 74 self.config = config or Config() 75 76 def init(self, config): 77 from xdevice import Variables 78 from _core.context.log import RuntimeLogs 79 import uuid 80 start_time = time.localtime(time.time()) 81 LOG.debug("StartTime={}".format(time.strftime("%Y-%m-%d %H:%M:%S", start_time))) 82 83 self.config.update(config.__dict__) 84 Variables.task_id = uuid.uuid4().hex 85 if getattr(config, ConfigConst.report_path, "") == "": 86 Variables.task_name = time.strftime('%Y-%m-%d-%H-%M-%S', start_time) 87 else: 88 Variables.task_name = config.report_path 89 90 # create a report folder to store test report 91 report_path = os.path.join(Variables.exec_dir, 92 Variables.report_vars.report_dir, 93 Variables.task_name) 94 os.makedirs(report_path, exist_ok=True) 95 self._check_report_path(report_path) 96 97 log_path = os.path.join(report_path, Variables.report_vars.log_dir) 98 os.makedirs(log_path, exist_ok=True) 99 100 self.config.kits = [] 101 if getattr(config, "task", ""): 102 task_file = config.task + self.TASK_CONFIG_SUFFIX 103 task_dir = self._get_task_dir(task_file) 104 self._load_task(task_dir, task_file) 105 106 self.config.top_dir = Variables.top_dir 107 self.config.exec_dir = Variables.exec_dir 108 self.config.report_path = report_path 109 self.config.log_path = log_path 110 self.config.start_time = time.strftime("%Y-%m-%d %H:%M:%S", start_time) 111 RuntimeLogs.start_task_log(self.config.log_path) 112 RuntimeLogs.start_encrypt_log(self.config.log_path) 113 LOG.info("Report path: %s", report_path) 114 115 def _get_task_dir(self, task_file): 116 from xdevice import Variables 117 exec_task_dir = os.path.abspath( 118 os.path.join(Variables.exec_dir, self.TASK_CONFIG_DIR)) 119 if not os.path.exists(os.path.join(exec_task_dir, task_file)): 120 err_msg = ErrorMessage.Common.Code_0101015.format(task_file) 121 if os.path.normcase(Variables.exec_dir) == \ 122 os.path.normcase(Variables.top_dir): 123 raise ParamError(err_msg) 124 125 top_task_dir = os.path.abspath( 126 os.path.join(Variables.top_dir, self.TASK_CONFIG_DIR)) 127 if not os.path.exists(os.path.join(top_task_dir, task_file)): 128 raise ParamError(err_msg) 129 else: 130 return top_task_dir 131 else: 132 return exec_task_dir 133 134 def _load_task(self, task_dir, file_name): 135 task_file = os.path.join(task_dir, file_name) 136 if not os.path.exists(task_file): 137 raise ParamError(ErrorMessage.Common.Code_0101015.format(task_file)) 138 139 # add kits to self.config 140 json_config = JsonParser(task_file) 141 kits = get_kit_instances(json_config, self.config.resource_path, 142 self.config.testcases_path) 143 self.config.kits.extend(kits) 144 145 def set_root_descriptor(self, root): 146 if not isinstance(root, Descriptor): 147 raise TypeError("need 'Descriptor' type param") 148 149 self.root = root 150 self._init_driver(root) 151 if not self.test_drivers: 152 LOG.error(ErrorMessage.Common.Code_0101016) 153 154 def _init_driver(self, test_descriptor): 155 156 plugin_id = None 157 source = test_descriptor.source 158 ignore_test = "" 159 if isinstance(source, TestSource): 160 if source.test_type is not None: 161 plugin_id = source.test_type 162 else: 163 ignore_test = source.module_name 164 LOG.error(ErrorMessage.Common.Code_0101017.format(source.test_name)) 165 166 drivers = get_plugin(plugin_type=Plugin.DRIVER, plugin_id=plugin_id) 167 if plugin_id is not None: 168 if len(drivers) == 0: 169 ignore_test = source.module_name 170 error_message = ErrorMessage.Common.Code_0101018.format(source.test_name, plugin_id) 171 LOG.error(error_message) 172 report_not_executed(self.config.report_path, [("", test_descriptor)], error_message) 173 else: 174 check_result = False 175 for driver in drivers: 176 driver_instance = driver.__class__() 177 device_options = get_device_options( 178 self.config.__dict__, source) 179 check_result = driver_instance.__check_environment__( 180 device_options) 181 if check_result or check_result is None: 182 self.test_drivers.append( 183 (driver_instance, test_descriptor)) 184 break 185 if check_result is False: 186 LOG.error(ErrorMessage.Common.Code_0101019.format(source.test_name, plugin_id)) 187 if ignore_test and hasattr(self.config, ConfigConst.component_mapper): 188 getattr(self.config, ConfigConst.component_mapper).pop(ignore_test) 189 190 for desc in test_descriptor.children: 191 self._init_driver(desc) 192 193 @classmethod 194 def _check_report_path(cls, report_path): 195 for _, _, files in os.walk(report_path): 196 for _file in files: 197 if _file.endswith(".xml"): 198 raise ParamError(ErrorMessage.Common.Code_0101020) 199 200 201class Request: 202 """ 203 Provides the necessary information for TestDriver to execute its tests. 204 """ 205 206 def __init__(self, uuid=None, root=None, listeners=None, config=None): 207 self.uuid = uuid 208 self.root = root 209 self.listeners = listeners if listeners else [] 210 self.config = config 211 212 def get_listeners(self): 213 return self.listeners 214 215 def get_config(self): 216 return self.config 217 218 def get(self, key=None, default=""): 219 # get value from self.config 220 if not key: 221 return default 222 return getattr(self.config, key, default) 223 224 def get_devices(self): 225 if self.config is None: 226 return [] 227 if not hasattr(self.config, "environment"): 228 return [] 229 if not hasattr(self.config.environment, "devices"): 230 return [] 231 return getattr(self.config.environment, "devices", []) 232 233 def get_config_file(self): 234 return self._get_source_value("config_file") 235 236 def get_source_file(self): 237 return self._get_source_value("source_file") 238 239 def get_test_name(self): 240 return self._get_source_value("test_name") 241 242 def get_source_string(self): 243 return self._get_source_value("source_string") 244 245 def get_test_type(self): 246 return self._get_source_value("test_type") 247 248 def get_module_name(self): 249 return self._get_source_value("module_name") 250 251 def get_repeat_round(self): 252 return get_repeat_round(self.root.unique_id) 253 254 def _get_source(self): 255 if not hasattr(self.root, "source"): 256 return "" 257 return getattr(self.root, "source", "") 258 259 def _get_source_value(self, key=None, default=""): 260 if not key: 261 return default 262 source = self._get_source() 263 if not source: 264 return default 265 return getattr(source, key, default) 266