1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import datetime 20import os 21 22from _core.constants import ModeType 23from _core.constants import ConfigConst 24from _core.exception import ParamError 25from _core.executor.source import TestSource 26from _core.logger import platform_logger 27from _core.plugin import Config 28from _core.plugin import Plugin 29from _core.plugin import get_plugin 30from _core.testkit.json_parser import JsonParser 31from _core.utils import get_kit_instances 32from _core.utils import get_cst_time 33 34__all__ = ["Descriptor", "Task", "Request"] 35LOG = platform_logger("Request") 36 37 38class Descriptor: 39 """ 40 The descriptor for a test or suite 41 """ 42 43 def __init__(self, uuid=None, name=None, source=None, container=False): 44 self.unique_id = uuid 45 self.display_name = name 46 self.tags = {} 47 self.source = source 48 self.parent = None 49 self.children = [] 50 self.container = container 51 52 def get_container(self): 53 return self.container 54 55 def get_unique_id(self): 56 return self.unique_id 57 58 59class Task: 60 """ 61 TestTask describes the tree of tests and suites 62 """ 63 EMPTY_TASK = "empty" 64 TASK_CONFIG_SUFFIX = ".json" 65 TASK_CONFIG_DIR = "config" 66 67 def __init__(self, root=None, drivers=None, config=None): 68 self.root = root 69 self.test_drivers = drivers or [] 70 self.config = config or Config() 71 72 def init(self, config): 73 from xdevice import Variables 74 from xdevice import Scheduler 75 start_time = get_cst_time() 76 LOG.debug("StartTime=%s" % start_time.strftime("%Y-%m-%d %H:%M:%S")) 77 78 self.config.update(config.__dict__) 79 if getattr(config, ConfigConst.report_path, "") == "": 80 Variables.task_name = start_time.strftime('%Y-%m-%d-%H-%M-%S') 81 else: 82 Variables.task_name = config.report_path 83 84 # create a report folder to store test report 85 report_path = os.path.join(Variables.exec_dir, 86 Variables.report_vars.report_dir, 87 Variables.task_name) 88 os.makedirs(report_path, exist_ok=True) 89 self._check_report_path(report_path) 90 91 log_path = os.path.join(report_path, Variables.report_vars.log_dir) 92 os.makedirs(log_path, exist_ok=True) 93 94 self.config.kits = [] 95 if getattr(config, "task", ""): 96 task_file = config.task + self.TASK_CONFIG_SUFFIX 97 task_dir = self._get_task_dir(task_file) 98 self._load_task(task_dir, task_file) 99 100 self.config.top_dir = Variables.top_dir 101 self.config.exec_dir = Variables.exec_dir 102 self.config.report_path = report_path 103 self.config.log_path = log_path 104 self.config.start_time = start_time.strftime("%Y-%m-%d %H:%M:%S") 105 Scheduler.start_task_log(self.config.log_path) 106 Scheduler.start_encrypt_log(self.config.log_path) 107 LOG.info("Report path: %s", report_path) 108 109 def _get_task_dir(self, task_file): 110 from xdevice import Variables 111 exec_task_dir = os.path.abspath( 112 os.path.join(Variables.exec_dir, self.TASK_CONFIG_DIR)) 113 if not os.path.exists(os.path.join(exec_task_dir, task_file)): 114 if os.path.normcase(Variables.exec_dir) == \ 115 os.path.normcase(Variables.top_dir): 116 raise ParamError("task file %s not exists, please add task " 117 "file to '%s'" % (task_file, exec_task_dir), 118 error_no="00101") 119 120 top_task_dir = os.path.abspath( 121 os.path.join(Variables.top_dir, self.TASK_CONFIG_DIR)) 122 if not os.path.exists(os.path.join(top_task_dir, task_file)): 123 raise ParamError("task file %s not exists, please add task " 124 "file to '%s' or '%s'" % ( 125 task_file, exec_task_dir, top_task_dir), 126 error_no="00101") 127 else: 128 return top_task_dir 129 else: 130 return exec_task_dir 131 132 def _load_task(self, task_dir, file_name): 133 task_file = os.path.join(task_dir, file_name) 134 if not os.path.exists(task_file): 135 raise ParamError("task file %s not exists" % task_file, 136 error_no="00101") 137 138 # add kits to self.config 139 json_config = JsonParser(task_file) 140 kits = get_kit_instances(json_config, self.config.resource_path, 141 self.config.testcases_path) 142 self.config.kits.extend(kits) 143 144 def set_root_descriptor(self, root): 145 if not isinstance(root, Descriptor): 146 raise TypeError("need 'Descriptor' type param") 147 148 self.root = root 149 self._init_driver(root) 150 if not self.test_drivers: 151 LOG.error("No test driver to execute", error_no="00106") 152 153 def _init_driver(self, test_descriptor): 154 from xdevice import Scheduler 155 156 plugin_id = None 157 source = test_descriptor.source 158 ignore_test = "" 159 if isinstance(source, TestSource): 160 if source.test_type is not None: 161 plugin_id = source.test_type 162 else: 163 ignore_test = source.module_name 164 LOG.error("'%s' no test driver specified" % source.test_name, 165 error_no="00106") 166 167 drivers = get_plugin(plugin_type=Plugin.DRIVER, plugin_id=plugin_id) 168 if plugin_id is not None: 169 if len(drivers) == 0: 170 ignore_test = source.module_name 171 error_message = "'%s' can not find test driver '%s'" % ( 172 source.test_name, plugin_id) 173 LOG.error(error_message, error_no="00106") 174 if Scheduler.mode == ModeType.decc: 175 error_message = "Load Error[00106]" 176 Scheduler.report_not_executed(self.config.report_path, [ 177 ("", test_descriptor)], error_message) 178 else: 179 check_result = False 180 for driver in drivers: 181 driver_instance = driver.__class__() 182 device_options = Scheduler.get_device_options( 183 self.config.__dict__, source) 184 check_result = driver_instance.__check_environment__( 185 device_options) 186 if check_result or check_result is None: 187 self.test_drivers.append( 188 (driver_instance, test_descriptor)) 189 break 190 if check_result is False: 191 LOG.error("'%s' can not find suitable test driver '%s'" % 192 (source.test_name, plugin_id), error_no="00106") 193 if ignore_test and hasattr(self.config, ConfigConst.component_mapper): 194 getattr(self.config, ConfigConst.component_mapper).pop(ignore_test) 195 196 for desc in test_descriptor.children: 197 self._init_driver(desc) 198 199 @classmethod 200 def _check_report_path(cls, report_path): 201 for _, _, files in os.walk(report_path): 202 for _file in files: 203 if _file.endswith(".xml"): 204 raise ParamError("xml file exists in '%s'" % report_path, 205 error_no="00105") 206 207 208class Request: 209 """ 210 Provides the necessary information for TestDriver to execute its tests. 211 """ 212 213 def __init__(self, uuid=None, root=None, listeners=None, config=None): 214 self.uuid = uuid 215 self.root = root 216 self.listeners = listeners if listeners else [] 217 self.config = config 218 219 def get_listeners(self): 220 return self.listeners 221 222 def get_config(self): 223 return self.config 224 225 def get(self, key=None, default=""): 226 # get value from self.config 227 if not key: 228 return default 229 return getattr(self.config, key, default) 230 231 def get_devices(self): 232 if self.config is None: 233 return [] 234 if not hasattr(self.config, "environment"): 235 return [] 236 if not hasattr(self.config.environment, "devices"): 237 return [] 238 return getattr(self.config.environment, "devices", []) 239 240 def get_config_file(self): 241 return self._get_source_value("config_file") 242 243 def get_source_file(self): 244 return self._get_source_value("source_file") 245 246 def get_test_name(self): 247 return self._get_source_value("test_name") 248 249 def get_source_string(self): 250 return self._get_source_value("source_string") 251 252 def get_test_type(self): 253 return self._get_source_value("test_type") 254 255 def get_module_name(self): 256 return self._get_source_value("module_name") 257 258 def _get_source(self): 259 if not hasattr(self.root, "source"): 260 return "" 261 return getattr(self.root, "source", "") 262 263 def _get_source_value(self, key=None, default=""): 264 if not key: 265 return default 266 source = self._get_source() 267 if not source: 268 return default 269 return getattr(source, key, default) 270