• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import datetime
20import os
21
22from _core.constants import ModeType
23from _core.constants import ConfigConst
24from _core.exception import ParamError
25from _core.executor.source import TestSource
26from _core.logger import platform_logger
27from _core.plugin import Config
28from _core.plugin import Plugin
29from _core.plugin import get_plugin
30from _core.testkit.json_parser import JsonParser
31from _core.utils import get_kit_instances
32from _core.utils import get_cst_time
33
34__all__ = ["Descriptor", "Task", "Request"]
35LOG = platform_logger("Request")
36
37
38class Descriptor:
39    """
40    The descriptor for a test or suite
41    """
42
43    def __init__(self, uuid=None, name=None, source=None, container=False):
44        self.unique_id = uuid
45        self.display_name = name
46        self.tags = {}
47        self.source = source
48        self.parent = None
49        self.children = []
50        self.container = container
51
52    def get_container(self):
53        return self.container
54
55    def get_unique_id(self):
56        return self.unique_id
57
58
59class Task:
60    """
61    TestTask describes the tree of tests and suites
62    """
63    EMPTY_TASK = "empty"
64    TASK_CONFIG_SUFFIX = ".json"
65    TASK_CONFIG_DIR = "config"
66
67    def __init__(self, root=None, drivers=None, config=None):
68        self.root = root
69        self.test_drivers = drivers or []
70        self.config = config or Config()
71
72    def init(self, config):
73        from xdevice import Variables
74        from xdevice import Scheduler
75        start_time = get_cst_time()
76        LOG.debug("StartTime=%s" % start_time.strftime("%Y-%m-%d %H:%M:%S"))
77
78        self.config.update(config.__dict__)
79        if getattr(config, ConfigConst.report_path, "") == "":
80            Variables.task_name = start_time.strftime('%Y-%m-%d-%H-%M-%S')
81        else:
82            Variables.task_name = config.report_path
83
84        # create a report folder to store test report
85        report_path = os.path.join(Variables.exec_dir,
86                                   Variables.report_vars.report_dir,
87                                   Variables.task_name)
88        os.makedirs(report_path, exist_ok=True)
89        self._check_report_path(report_path)
90
91        log_path = os.path.join(report_path, Variables.report_vars.log_dir)
92        os.makedirs(log_path, exist_ok=True)
93
94        self.config.kits = []
95        if getattr(config, "task", ""):
96            task_file = config.task + self.TASK_CONFIG_SUFFIX
97            task_dir = self._get_task_dir(task_file)
98            self._load_task(task_dir, task_file)
99
100        self.config.top_dir = Variables.top_dir
101        self.config.exec_dir = Variables.exec_dir
102        self.config.report_path = report_path
103        self.config.log_path = log_path
104        self.config.start_time = start_time.strftime("%Y-%m-%d %H:%M:%S")
105        Scheduler.start_task_log(self.config.log_path)
106        Scheduler.start_encrypt_log(self.config.log_path)
107        LOG.info("Report path: %s", report_path)
108
109    def _get_task_dir(self, task_file):
110        from xdevice import Variables
111        exec_task_dir = os.path.abspath(
112            os.path.join(Variables.exec_dir, self.TASK_CONFIG_DIR))
113        if not os.path.exists(os.path.join(exec_task_dir, task_file)):
114            if os.path.normcase(Variables.exec_dir) == \
115                    os.path.normcase(Variables.top_dir):
116                raise ParamError("task file %s not exists, please add task "
117                                 "file to '%s'" % (task_file, exec_task_dir),
118                                 error_no="00101")
119
120            top_task_dir = os.path.abspath(
121                os.path.join(Variables.top_dir, self.TASK_CONFIG_DIR))
122            if not os.path.exists(os.path.join(top_task_dir, task_file)):
123                raise ParamError("task file %s not exists, please add task "
124                                 "file to '%s' or '%s'" % (
125                                     task_file, exec_task_dir, top_task_dir),
126                                 error_no="00101")
127            else:
128                return top_task_dir
129        else:
130            return exec_task_dir
131
132    def _load_task(self, task_dir, file_name):
133        task_file = os.path.join(task_dir, file_name)
134        if not os.path.exists(task_file):
135            raise ParamError("task file %s not exists" % task_file,
136                             error_no="00101")
137
138        # add kits to self.config
139        json_config = JsonParser(task_file)
140        kits = get_kit_instances(json_config, self.config.resource_path,
141                                 self.config.testcases_path)
142        self.config.kits.extend(kits)
143
144    def set_root_descriptor(self, root):
145        if not isinstance(root, Descriptor):
146            raise TypeError("need 'Descriptor' type param")
147
148        self.root = root
149        self._init_driver(root)
150        if not self.test_drivers:
151            LOG.error("No test driver to execute", error_no="00106")
152
153    def _init_driver(self, test_descriptor):
154        from xdevice import Scheduler
155
156        plugin_id = None
157        source = test_descriptor.source
158        ignore_test = ""
159        if isinstance(source, TestSource):
160            if source.test_type is not None:
161                plugin_id = source.test_type
162            else:
163                ignore_test = source.module_name
164                LOG.error("'%s' no test driver specified" % source.test_name,
165                          error_no="00106")
166
167        drivers = get_plugin(plugin_type=Plugin.DRIVER, plugin_id=plugin_id)
168        if plugin_id is not None:
169            if len(drivers) == 0:
170                ignore_test = source.module_name
171                error_message = "'%s' can not find test driver '%s'" % (
172                    source.test_name, plugin_id)
173                LOG.error(error_message, error_no="00106")
174                if Scheduler.mode == ModeType.decc:
175                    error_message = "Load Error[00106]"
176                    Scheduler.report_not_executed(self.config.report_path, [
177                        ("", test_descriptor)], error_message)
178            else:
179                check_result = False
180                for driver in drivers:
181                    driver_instance = driver.__class__()
182                    device_options = Scheduler.get_device_options(
183                        self.config.__dict__, source)
184                    check_result = driver_instance.__check_environment__(
185                        device_options)
186                    if check_result or check_result is None:
187                        self.test_drivers.append(
188                            (driver_instance, test_descriptor))
189                        break
190                if check_result is False:
191                    LOG.error("'%s' can not find suitable test driver '%s'" %
192                              (source.test_name, plugin_id), error_no="00106")
193        if ignore_test and hasattr(self.config, ConfigConst.component_mapper):
194            getattr(self.config, ConfigConst.component_mapper).pop(ignore_test)
195
196        for desc in test_descriptor.children:
197            self._init_driver(desc)
198
199    @classmethod
200    def _check_report_path(cls, report_path):
201        for _, _, files in os.walk(report_path):
202            for _file in files:
203                if _file.endswith(".xml"):
204                    raise ParamError("xml file exists in '%s'" % report_path,
205                                     error_no="00105")
206
207
208class Request:
209    """
210    Provides the necessary information for TestDriver to execute its tests.
211    """
212
213    def __init__(self, uuid=None, root=None, listeners=None, config=None):
214        self.uuid = uuid
215        self.root = root
216        self.listeners = listeners if listeners else []
217        self.config = config
218
219    def get_listeners(self):
220        return self.listeners
221
222    def get_config(self):
223        return self.config
224
225    def get(self, key=None, default=""):
226        # get value from self.config
227        if not key:
228            return default
229        return getattr(self.config, key, default)
230
231    def get_devices(self):
232        if self.config is None:
233            return []
234        if not hasattr(self.config, "environment"):
235            return []
236        if not hasattr(self.config.environment, "devices"):
237            return []
238        return getattr(self.config.environment, "devices", [])
239
240    def get_config_file(self):
241        return self._get_source_value("config_file")
242
243    def get_source_file(self):
244        return self._get_source_value("source_file")
245
246    def get_test_name(self):
247        return self._get_source_value("test_name")
248
249    def get_source_string(self):
250        return self._get_source_value("source_string")
251
252    def get_test_type(self):
253        return self._get_source_value("test_type")
254
255    def get_module_name(self):
256        return self._get_source_value("module_name")
257
258    def _get_source(self):
259        if not hasattr(self.root, "source"):
260            return ""
261        return getattr(self.root, "source", "")
262
263    def _get_source_value(self, key=None, default=""):
264        if not key:
265            return default
266        source = self._get_source()
267        if not source:
268            return default
269        return getattr(source, key, default)
270