1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import datetime 20import os 21 22from _core.constants import ModeType 23from _core.constants import ConfigConst 24from _core.exception import ParamError 25from _core.executor.source import TestSource 26from _core.logger import platform_logger 27from _core.plugin import Config 28from _core.plugin import Plugin 29from _core.plugin import get_plugin 30from _core.testkit.json_parser import JsonParser 31from _core.utils import get_kit_instances 32from _core.utils import get_cst_time 33 34__all__ = ["Descriptor", "Task", "Request"] 35LOG = platform_logger("Request") 36 37 38class Descriptor: 39 """ 40 The descriptor for a test or suite 41 """ 42 43 def __init__(self, uuid=None, name=None, source=None, con=False, error=None): 44 self.unique_id = uuid 45 self.display_name = name 46 self.tags = {} 47 self.source = source 48 self.parent = None 49 self.children = [] 50 self.con = con 51 self.error = error 52 53 def get_con(self): 54 return self.con 55 56 def get_unique_id(self): 57 return self.unique_id 58 59 60class Task: 61 """ 62 TestTask describes the tree of tests and suites 63 """ 64 EMPTY_TASK = "empty" 65 TASK_CONFIG_SUFFIX = ".json" 66 TASK_CONFIG_DIR = "config" 67 life_stage_listener = None 68 69 def __init__(self, root=None, drivers=None, config=None): 70 self.root = root 71 self.test_drivers = drivers or [] 72 self.config = config or Config() 73 74 def init(self, config): 75 from xdevice import Variables 76 from xdevice import Scheduler 77 start_time = get_cst_time() 78 LOG.debug("StartTime=%s" % start_time.strftime("%Y-%m-%d %H:%M:%S")) 79 80 self.config.update(config.__dict__) 81 if getattr(config, ConfigConst.report_path, "") == "": 82 Variables.task_name = start_time.strftime('%Y-%m-%d-%H-%M-%S') 83 else: 84 Variables.task_name = config.report_path 85 86 # create a report folder to store test report 87 report_path = os.path.join(Variables.exec_dir, 88 Variables.report_vars.report_dir, 89 Variables.task_name) 90 os.makedirs(report_path, exist_ok=True) 91 self._check_report_path(report_path) 92 93 log_path = os.path.join(report_path, Variables.report_vars.log_dir) 94 os.makedirs(log_path, exist_ok=True) 95 96 self.config.kits = [] 97 if getattr(config, "task", ""): 98 task_file = config.task + self.TASK_CONFIG_SUFFIX 99 task_dir = self._get_task_dir(task_file) 100 self._load_task(task_dir, task_file) 101 102 self.config.top_dir = Variables.top_dir 103 self.config.exec_dir = Variables.exec_dir 104 self.config.report_path = report_path 105 self.config.log_path = log_path 106 self.config.start_time = start_time.strftime("%Y-%m-%d %H:%M:%S") 107 Scheduler.start_task_log(self.config.log_path) 108 Scheduler.start_encrypt_log(self.config.log_path) 109 LOG.info("Report path: %s", report_path) 110 111 def _get_task_dir(self, task_file): 112 from xdevice import Variables 113 exec_task_dir = os.path.abspath( 114 os.path.join(Variables.exec_dir, self.TASK_CONFIG_DIR)) 115 if not os.path.exists(os.path.join(exec_task_dir, task_file)): 116 if os.path.normcase(Variables.exec_dir) == \ 117 os.path.normcase(Variables.top_dir): 118 raise ParamError("task file %s not exists, please add task " 119 "file to '%s'" % (task_file, exec_task_dir), 120 error_no="00101") 121 122 top_task_dir = os.path.abspath( 123 os.path.join(Variables.top_dir, self.TASK_CONFIG_DIR)) 124 if not os.path.exists(os.path.join(top_task_dir, task_file)): 125 raise ParamError("task file %s not exists, please add task " 126 "file to '%s' or '%s'" % ( 127 task_file, exec_task_dir, top_task_dir), 128 error_no="00101") 129 else: 130 return top_task_dir 131 else: 132 return exec_task_dir 133 134 def _load_task(self, task_dir, file_name): 135 task_file = os.path.join(task_dir, file_name) 136 if not os.path.exists(task_file): 137 raise ParamError("task file %s not exists" % task_file, 138 error_no="00101") 139 140 # add kits to self.config 141 json_config = JsonParser(task_file) 142 kits = get_kit_instances(json_config, self.config.resource_path, 143 self.config.testcases_path) 144 self.config.kits.extend(kits) 145 146 def set_root_descriptor(self, root): 147 if not isinstance(root, Descriptor): 148 raise TypeError("need 'Descriptor' type param") 149 150 self.root = root 151 self._init_driver(root) 152 if not self.test_drivers: 153 LOG.error("No test driver to execute", error_no="00106") 154 155 def _init_driver(self, test_descriptor): 156 from xdevice import Scheduler 157 158 plugin_id = None 159 source = test_descriptor.source 160 ignore_test = "" 161 if isinstance(source, TestSource): 162 if source.test_type is not None: 163 plugin_id = source.test_type 164 else: 165 ignore_test = source.module_name 166 LOG.error("'%s' no test driver specified" % source.test_name, 167 error_no="00106") 168 169 drivers = get_plugin(plugin_type=Plugin.DRIVER, plugin_id=plugin_id) 170 if plugin_id is not None: 171 if len(drivers) == 0: 172 ignore_test = source.module_name 173 error_message = "'%s' can not find test driver '%s'" % ( 174 source.test_name, plugin_id) 175 LOG.error(error_message, error_no="00106") 176 if Scheduler.mode == ModeType.decc: 177 error_message = "Load Error[00106]" 178 Scheduler.report_not_executed(self.config.report_path, [ 179 ("", test_descriptor)], error_message) 180 else: 181 check_result = False 182 for driver in drivers: 183 driver_instance = driver.__class__() 184 device_options = Scheduler.get_device_options( 185 self.config.__dict__, source) 186 check_result = driver_instance.__check_environment__( 187 device_options) 188 if check_result or check_result is None: 189 self.test_drivers.append( 190 (driver_instance, test_descriptor)) 191 break 192 if check_result is False: 193 LOG.error("'%s' can not find suitable test driver '%s'" % 194 (source.test_name, plugin_id), error_no="00106") 195 if ignore_test and hasattr(self.config, ConfigConst.component_mapper): 196 getattr(self.config, ConfigConst.component_mapper).pop(ignore_test) 197 198 for desc in test_descriptor.children: 199 self._init_driver(desc) 200 201 @classmethod 202 def _check_report_path(cls, report_path): 203 for _, _, files in os.walk(report_path): 204 for _file in files: 205 if _file.endswith(".xml"): 206 raise ParamError("xml file exists in '%s'" % report_path, 207 error_no="00105") 208 209 210class Request: 211 """ 212 Provides the necessary information for TestDriver to execute its tests. 213 """ 214 215 def __init__(self, uuid=None, root=None, listeners=None, config=None): 216 self.uuid = uuid 217 self.root = root 218 self.listeners = listeners if listeners else [] 219 self.config = config 220 221 def get_listeners(self): 222 return self.listeners 223 224 def get_config(self): 225 return self.config 226 227 def get(self, key=None, default=""): 228 # get value from self.config 229 if not key: 230 return default 231 return getattr(self.config, key, default) 232 233 def get_devices(self): 234 if self.config is None: 235 return [] 236 if not hasattr(self.config, "environment"): 237 return [] 238 if not hasattr(self.config.environment, "devices"): 239 return [] 240 return getattr(self.config.environment, "devices", []) 241 242 def get_config_file(self): 243 return self._get_source_value("config_file") 244 245 def get_source_file(self): 246 return self._get_source_value("source_file") 247 248 def get_test_name(self): 249 return self._get_source_value("test_name") 250 251 def get_source_string(self): 252 return self._get_source_value("source_string") 253 254 def get_test_type(self): 255 return self._get_source_value("test_type") 256 257 def get_module_name(self): 258 return self._get_source_value("module_name") 259 260 def _get_source(self): 261 if not hasattr(self.root, "source"): 262 return "" 263 return getattr(self.root, "source", "") 264 265 def _get_source_value(self, key=None, default=""): 266 if not key: 267 return default 268 source = self._get_source() 269 if not source: 270 return default 271 return getattr(source, key, default) 272