• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import time
21
22from _core.constants import ConfigConst
23from _core.context.handler import report_not_executed
24from _core.context.option_util import get_device_options
25from _core.error import ErrorMessage
26from _core.exception import ParamError
27from _core.executor.source import TestSource
28from _core.logger import platform_logger
29from _core.plugin import Config
30from _core.plugin import Plugin
31from _core.plugin import get_plugin
32from _core.testkit.json_parser import JsonParser
33from _core.utils import get_kit_instances
34from _core.utils import get_repeat_round
35
36__all__ = ["Descriptor", "Task", "Request"]
37LOG = platform_logger("Request")
38
39
40class Descriptor:
41    """
42    The descriptor for a test or suite
43    """
44
45    def __init__(self, uuid=None, name=None, source=None, container=False, error=None):
46        self.unique_id = uuid
47        self.display_name = name
48        self.tags = {}
49        self.source = source
50        self.parent = None
51        self.children = []
52        self.container = container
53        self.error = error
54
55    def get_container(self):
56        return self.container
57
58    def get_unique_id(self):
59        return self.unique_id
60
61
62class Task:
63    """
64    TestTask describes the tree of tests and suites
65    """
66    EMPTY_TASK = "empty"
67    TASK_CONFIG_SUFFIX = ".json"
68    TASK_CONFIG_DIR = "config"
69    life_stage_listener = None
70
71    def __init__(self, root=None, drivers=None, config=None):
72        self.root = root
73        self.test_drivers = drivers or []
74        self.config = config or Config()
75
76    def init(self, config):
77        from xdevice import Variables
78        from _core.context.log import RuntimeLogs
79        import uuid
80        start_time = time.localtime(time.time())
81        LOG.debug("StartTime={}".format(time.strftime("%Y-%m-%d %H:%M:%S", start_time)))
82
83        self.config.update(config.__dict__)
84        Variables.task_id = uuid.uuid4().hex
85        if getattr(config, ConfigConst.report_path, "") == "":
86            Variables.task_name = time.strftime('%Y-%m-%d-%H-%M-%S', start_time)
87        else:
88            Variables.task_name = config.report_path
89
90        # create a report folder to store test report
91        report_path = os.path.join(Variables.exec_dir,
92                                   Variables.report_vars.report_dir,
93                                   Variables.task_name)
94        os.makedirs(report_path, exist_ok=True)
95        self._check_report_path(report_path)
96
97        log_path = os.path.join(report_path, Variables.report_vars.log_dir)
98        os.makedirs(log_path, exist_ok=True)
99
100        self.config.kits = []
101        if getattr(config, "task", ""):
102            task_file = config.task + self.TASK_CONFIG_SUFFIX
103            task_dir = self._get_task_dir(task_file)
104            self._load_task(task_dir, task_file)
105
106        self.config.top_dir = Variables.top_dir
107        self.config.exec_dir = Variables.exec_dir
108        self.config.report_path = report_path
109        self.config.log_path = log_path
110        self.config.start_time = time.strftime("%Y-%m-%d %H:%M:%S", start_time)
111        RuntimeLogs.start_task_log(self.config.log_path)
112        RuntimeLogs.start_encrypt_log(self.config.log_path)
113        LOG.info("Report path: %s", report_path)
114
115    def _get_task_dir(self, task_file):
116        from xdevice import Variables
117        exec_task_dir = os.path.abspath(
118            os.path.join(Variables.exec_dir, self.TASK_CONFIG_DIR))
119        if not os.path.exists(os.path.join(exec_task_dir, task_file)):
120            err_msg = ErrorMessage.Common.Code_0101015.format(task_file)
121            if os.path.normcase(Variables.exec_dir) == \
122                    os.path.normcase(Variables.top_dir):
123                raise ParamError(err_msg)
124
125            top_task_dir = os.path.abspath(
126                os.path.join(Variables.top_dir, self.TASK_CONFIG_DIR))
127            if not os.path.exists(os.path.join(top_task_dir, task_file)):
128                raise ParamError(err_msg)
129            else:
130                return top_task_dir
131        else:
132            return exec_task_dir
133
134    def _load_task(self, task_dir, file_name):
135        task_file = os.path.join(task_dir, file_name)
136        if not os.path.exists(task_file):
137            raise ParamError(ErrorMessage.Common.Code_0101015.format(task_file))
138
139        # add kits to self.config
140        json_config = JsonParser(task_file)
141        kits = get_kit_instances(json_config, self.config.resource_path,
142                                 self.config.testcases_path)
143        self.config.kits.extend(kits)
144
145    def set_root_descriptor(self, root):
146        if not isinstance(root, Descriptor):
147            raise TypeError("need 'Descriptor' type param")
148
149        self.root = root
150        self._init_driver(root)
151        if not self.test_drivers:
152            LOG.error(ErrorMessage.Common.Code_0101016)
153
154    def _init_driver(self, test_descriptor):
155
156        plugin_id = None
157        source = test_descriptor.source
158        ignore_test = ""
159        if isinstance(source, TestSource):
160            if source.test_type is not None:
161                plugin_id = source.test_type
162            else:
163                ignore_test = source.module_name
164                LOG.error(ErrorMessage.Common.Code_0101017.format(source.test_name))
165
166        drivers = get_plugin(plugin_type=Plugin.DRIVER, plugin_id=plugin_id)
167        if plugin_id is not None:
168            if len(drivers) == 0:
169                ignore_test = source.module_name
170                error_message = ErrorMessage.Common.Code_0101018.format(source.test_name, plugin_id)
171                LOG.error(error_message)
172                report_not_executed(self.config.report_path, [("", test_descriptor)], error_message)
173            else:
174                check_result = False
175                for driver in drivers:
176                    driver_instance = driver.__class__()
177                    device_options = get_device_options(
178                        self.config.__dict__, source)
179                    check_result = driver_instance.__check_environment__(
180                        device_options)
181                    if check_result or check_result is None:
182                        self.test_drivers.append(
183                            (driver_instance, test_descriptor))
184                        break
185                if check_result is False:
186                    LOG.error(ErrorMessage.Common.Code_0101019.format(source.test_name, plugin_id))
187        if ignore_test and hasattr(self.config, ConfigConst.component_mapper):
188            getattr(self.config, ConfigConst.component_mapper).pop(ignore_test)
189
190        for desc in test_descriptor.children:
191            self._init_driver(desc)
192
193    @classmethod
194    def _check_report_path(cls, report_path):
195        for _, _, files in os.walk(report_path):
196            for _file in files:
197                if _file.endswith(".xml"):
198                    raise ParamError(ErrorMessage.Common.Code_0101020)
199
200
201class Request:
202    """
203    Provides the necessary information for TestDriver to execute its tests.
204    """
205
206    def __init__(self, uuid=None, root=None, listeners=None, config=None):
207        self.uuid = uuid
208        self.root = root
209        self.listeners = listeners if listeners else []
210        self.config = config
211
212    def get_listeners(self):
213        return self.listeners
214
215    def get_config(self):
216        return self.config
217
218    def get(self, key=None, default=""):
219        # get value from self.config
220        if not key:
221            return default
222        return getattr(self.config, key, default)
223
224    def get_devices(self):
225        if self.config is None:
226            return []
227        if not hasattr(self.config, "environment"):
228            return []
229        if not hasattr(self.config.environment, "devices"):
230            return []
231        return getattr(self.config.environment, "devices", [])
232
233    def get_config_file(self):
234        return self._get_source_value("config_file")
235
236    def get_source_file(self):
237        return self._get_source_value("source_file")
238
239    def get_test_name(self):
240        return self._get_source_value("test_name")
241
242    def get_source_string(self):
243        return self._get_source_value("source_string")
244
245    def get_test_type(self):
246        return self._get_source_value("test_type")
247
248    def get_module_name(self):
249        return self._get_source_value("module_name")
250
251    def get_repeat_round(self):
252        return get_repeat_round(self.root.unique_id)
253
254    def _get_source(self):
255        if not hasattr(self.root, "source"):
256            return ""
257        return getattr(self.root, "source", "")
258
259    def _get_source_value(self, key=None, default=""):
260        if not key:
261            return default
262        source = self._get_source()
263        if not source:
264            return default
265        return getattr(source, key, default)
266