• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import datetime
20import os
21
22from _core.constants import ModeType
23from _core.constants import ConfigConst
24from _core.exception import ParamError
25from _core.executor.source import TestSource
26from _core.logger import platform_logger
27from _core.plugin import Config
28from _core.plugin import Plugin
29from _core.plugin import get_plugin
30from _core.testkit.json_parser import JsonParser
31from _core.utils import get_kit_instances
32from _core.utils import get_cst_time
33
34__all__ = ["Descriptor", "Task", "Request"]
35LOG = platform_logger("Request")
36
37
38class Descriptor:
39    """
40    The descriptor for a test or suite
41    """
42
43    def __init__(self, uuid=None, name=None, source=None, con=False, error=None):
44        self.unique_id = uuid
45        self.display_name = name
46        self.tags = {}
47        self.source = source
48        self.parent = None
49        self.children = []
50        self.con = con
51        self.error = error
52
53    def get_con(self):
54        return self.con
55
56    def get_unique_id(self):
57        return self.unique_id
58
59
60class Task:
61    """
62    TestTask describes the tree of tests and suites
63    """
64    EMPTY_TASK = "empty"
65    TASK_CONFIG_SUFFIX = ".json"
66    TASK_CONFIG_DIR = "config"
67    life_stage_listener = None
68
69    def __init__(self, root=None, drivers=None, config=None):
70        self.root = root
71        self.test_drivers = drivers or []
72        self.config = config or Config()
73
74    def init(self, config):
75        from xdevice import Variables
76        from xdevice import Scheduler
77        start_time = get_cst_time()
78        LOG.debug("StartTime=%s" % start_time.strftime("%Y-%m-%d %H:%M:%S"))
79
80        self.config.update(config.__dict__)
81        if getattr(config, ConfigConst.report_path, "") == "":
82            Variables.task_name = start_time.strftime('%Y-%m-%d-%H-%M-%S')
83        else:
84            Variables.task_name = config.report_path
85
86        # create a report folder to store test report
87        report_path = os.path.join(Variables.exec_dir,
88                                   Variables.report_vars.report_dir,
89                                   Variables.task_name)
90        os.makedirs(report_path, exist_ok=True)
91        self._check_report_path(report_path)
92
93        log_path = os.path.join(report_path, Variables.report_vars.log_dir)
94        os.makedirs(log_path, exist_ok=True)
95
96        self.config.kits = []
97        if getattr(config, "task", ""):
98            task_file = config.task + self.TASK_CONFIG_SUFFIX
99            task_dir = self._get_task_dir(task_file)
100            self._load_task(task_dir, task_file)
101
102        self.config.top_dir = Variables.top_dir
103        self.config.exec_dir = Variables.exec_dir
104        self.config.report_path = report_path
105        self.config.log_path = log_path
106        self.config.start_time = start_time.strftime("%Y-%m-%d %H:%M:%S")
107        Scheduler.start_task_log(self.config.log_path)
108        Scheduler.start_encrypt_log(self.config.log_path)
109        LOG.info("Report path: %s", report_path)
110
111    def _get_task_dir(self, task_file):
112        from xdevice import Variables
113        exec_task_dir = os.path.abspath(
114            os.path.join(Variables.exec_dir, self.TASK_CONFIG_DIR))
115        if not os.path.exists(os.path.join(exec_task_dir, task_file)):
116            if os.path.normcase(Variables.exec_dir) == \
117                    os.path.normcase(Variables.top_dir):
118                raise ParamError("task file %s not exists, please add task "
119                                 "file to '%s'" % (task_file, exec_task_dir),
120                                 error_no="00101")
121
122            top_task_dir = os.path.abspath(
123                os.path.join(Variables.top_dir, self.TASK_CONFIG_DIR))
124            if not os.path.exists(os.path.join(top_task_dir, task_file)):
125                raise ParamError("task file %s not exists, please add task "
126                                 "file to '%s' or '%s'" % (
127                                     task_file, exec_task_dir, top_task_dir),
128                                 error_no="00101")
129            else:
130                return top_task_dir
131        else:
132            return exec_task_dir
133
134    def _load_task(self, task_dir, file_name):
135        task_file = os.path.join(task_dir, file_name)
136        if not os.path.exists(task_file):
137            raise ParamError("task file %s not exists" % task_file,
138                             error_no="00101")
139
140        # add kits to self.config
141        json_config = JsonParser(task_file)
142        kits = get_kit_instances(json_config, self.config.resource_path,
143                                 self.config.testcases_path)
144        self.config.kits.extend(kits)
145
146    def set_root_descriptor(self, root):
147        if not isinstance(root, Descriptor):
148            raise TypeError("need 'Descriptor' type param")
149
150        self.root = root
151        self._init_driver(root)
152        if not self.test_drivers:
153            LOG.error("No test driver to execute", error_no="00106")
154
155    def _init_driver(self, test_descriptor):
156        from xdevice import Scheduler
157
158        plugin_id = None
159        source = test_descriptor.source
160        ignore_test = ""
161        if isinstance(source, TestSource):
162            if source.test_type is not None:
163                plugin_id = source.test_type
164            else:
165                ignore_test = source.module_name
166                LOG.error("'%s' no test driver specified" % source.test_name,
167                          error_no="00106")
168
169        drivers = get_plugin(plugin_type=Plugin.DRIVER, plugin_id=plugin_id)
170        if plugin_id is not None:
171            if len(drivers) == 0:
172                ignore_test = source.module_name
173                error_message = "'%s' can not find test driver '%s'" % (
174                    source.test_name, plugin_id)
175                LOG.error(error_message, error_no="00106")
176                if Scheduler.mode == ModeType.decc:
177                    error_message = "Load Error[00106]"
178                    Scheduler.report_not_executed(self.config.report_path, [
179                        ("", test_descriptor)], error_message)
180            else:
181                check_result = False
182                for driver in drivers:
183                    driver_instance = driver.__class__()
184                    device_options = Scheduler.get_device_options(
185                        self.config.__dict__, source)
186                    check_result = driver_instance.__check_environment__(
187                        device_options)
188                    if check_result or check_result is None:
189                        self.test_drivers.append(
190                            (driver_instance, test_descriptor))
191                        break
192                if check_result is False:
193                    LOG.error("'%s' can not find suitable test driver '%s'" %
194                              (source.test_name, plugin_id), error_no="00106")
195        if ignore_test and hasattr(self.config, ConfigConst.component_mapper):
196            getattr(self.config, ConfigConst.component_mapper).pop(ignore_test)
197
198        for desc in test_descriptor.children:
199            self._init_driver(desc)
200
201    @classmethod
202    def _check_report_path(cls, report_path):
203        for _, _, files in os.walk(report_path):
204            for _file in files:
205                if _file.endswith(".xml"):
206                    raise ParamError("xml file exists in '%s'" % report_path,
207                                     error_no="00105")
208
209
210class Request:
211    """
212    Provides the necessary information for TestDriver to execute its tests.
213    """
214
215    def __init__(self, uuid=None, root=None, listeners=None, config=None):
216        self.uuid = uuid
217        self.root = root
218        self.listeners = listeners if listeners else []
219        self.config = config
220
221    def get_listeners(self):
222        return self.listeners
223
224    def get_config(self):
225        return self.config
226
227    def get(self, key=None, default=""):
228        # get value from self.config
229        if not key:
230            return default
231        return getattr(self.config, key, default)
232
233    def get_devices(self):
234        if self.config is None:
235            return []
236        if not hasattr(self.config, "environment"):
237            return []
238        if not hasattr(self.config.environment, "devices"):
239            return []
240        return getattr(self.config.environment, "devices", [])
241
242    def get_config_file(self):
243        return self._get_source_value("config_file")
244
245    def get_source_file(self):
246        return self._get_source_value("source_file")
247
248    def get_test_name(self):
249        return self._get_source_value("test_name")
250
251    def get_source_string(self):
252        return self._get_source_value("source_string")
253
254    def get_test_type(self):
255        return self._get_source_value("test_type")
256
257    def get_module_name(self):
258        return self._get_source_value("module_name")
259
260    def _get_source(self):
261        if not hasattr(self.root, "source"):
262            return ""
263        return getattr(self.root, "source", "")
264
265    def _get_source_value(self, key=None, default=""):
266        if not key:
267            return default
268        source = self._get_source()
269        if not source:
270            return default
271        return getattr(source, key, default)
272