• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import json
21import copy
22import stat
23from collections import namedtuple
24
25from _core.constants import DeviceTestType
26from _core.constants import ModeType
27from _core.constants import HostDrivenTestType
28from _core.constants import ConfigConst
29from _core.exception import ParamError
30from _core.logger import platform_logger
31from _core.utils import get_filename_extension
32from _core.utils import is_config_str
33from _core.utils import unique_id
34
35
36__all__ = ["TestSetSource", "TestSource", "find_test_descriptors",
37           "find_testdict_descriptors", "TestDictSource"]
38
39TestSetSource = namedtuple('TestSetSource', 'set')
40TestSource = namedtuple('TestSource', 'source_file source_string config_file '
41                                      'test_name test_type module_name')
42
43TEST_TYPE_DICT = {"HAP": DeviceTestType.hap_test,
44                  "PYT": HostDrivenTestType.device_test,
45                  "JST": DeviceTestType.jsunit_test,
46                  "CXX": DeviceTestType.cpp_test,
47                  "BIN": DeviceTestType.lite_cpp_test}
48EXT_TYPE_DICT = {".hap": DeviceTestType.hap_test,
49                 ".py": HostDrivenTestType.device_test,
50                 ".js": DeviceTestType.jsunit_test,
51                 ".bin": DeviceTestType.lite_cpp_test,
52                 "default": DeviceTestType.cpp_test}
53PY_SUFFIX = ".py"
54PYD_SUFFIX = ".pyd"
55MODULE_CONFIG_SUFFIX = ".json"
56MODULE_INFO_SUFFIX = ".moduleInfo"
57MAX_DIR_DEPTH = 6
58LOG = platform_logger("TestSource")
59
60
61def find_test_descriptors(config):
62    if not config.testfile and not config.testlist and not config.task and \
63            not config.testcase:
64        return None
65
66    # get test sources
67    testcases_dirs = _get_testcases_dirs(config)
68    test_sources = _get_test_sources(config, testcases_dirs)
69    LOG.debug("test sources: %s", test_sources)
70
71    # normalize test sources
72    test_sources = _normalize_test_sources(testcases_dirs, test_sources,
73                                           config)
74
75    # make test descriptors
76    test_descriptors = _make_test_descriptors_from_testsources(test_sources,
77                                                               config)
78    return test_descriptors
79
80
81def _get_testcases_dirs(config):
82    from xdevice import Variables
83    # add config.testcases_path and its subfolders
84    testcases_dirs = []
85    if getattr(config, ConfigConst.testcases_path, ""):
86        testcases_dirs = [config.testcases_path]
87        _append_subfolders(config.testcases_path, testcases_dirs)
88
89    # add inner testcases dir and its subfolders
90    inner_testcases_dir = os.path.abspath(os.path.join(
91        Variables.top_dir, "testcases"))
92    if getattr(config, ConfigConst.testcases_path, "") and os.path.normcase(
93            config.testcases_path) != os.path.normcase(inner_testcases_dir):
94        testcases_dirs.append(inner_testcases_dir)
95        _append_subfolders(inner_testcases_dir, testcases_dirs)
96
97    # add execution dir and top dir
98    testcases_dirs.append(Variables.exec_dir)
99    if os.path.normcase(Variables.exec_dir) != os.path.normcase(
100            Variables.top_dir):
101        testcases_dirs.append(Variables.top_dir)
102
103    LOG.debug("testcases directories: %s", testcases_dirs)
104    return testcases_dirs
105
106
107def _append_subfolders(testcases_path, testcases_dirs):
108    for root, dirs, _ in os.walk(testcases_path):
109        for sub_dir in dirs:
110            testcases_dirs.append(os.path.abspath(os.path.join(root, sub_dir)))
111
112
113def find_testdict_descriptors(config):
114    from xdevice import Variables
115    if getattr(config, ConfigConst.testdict, "") == "":
116        return None
117    testdict = config.testdict
118    test_descriptors = []
119    for test_type_key, files in testdict.items():
120        for file_name in files:
121            if not os.path.isabs(file_name):
122                file_name = os.path.join(Variables.exec_dir, file_name)
123            if os.path.isfile(file_name) and test_type_key in \
124                    TestDictSource.test_type.keys():
125                desc = _make_test_descriptor(os.path.abspath(file_name),
126                                             test_type_key)
127                if desc is not None:
128                    test_descriptors.append(desc)
129    if not test_descriptors:
130        raise ParamError("test source is none", error_no="00110")
131    return test_descriptors
132
133
134def _append_component_test_source(config, testcases_dir, test_sources):
135    subsystem_list = config.subsystems if config.subsystems else list()
136    part_list = config.parts if config.parts else list()
137    module_info_files = _get_component_info_file(testcases_dir)
138    result_dict = dict()
139    for info_file in module_info_files:
140        flags = os.O_RDONLY
141        modes = stat.S_IWUSR | stat.S_IRUSR
142        with os.fdopen(os.open(info_file, flags, modes), "r") as f_handler:
143            result_dict.update(json.load(f_handler))
144        module_name = result_dict.get("module_name", "")
145        part_name = result_dict.get("part_name", "")
146        subsystem_name = result_dict.get("subsystem", "")
147        if not module_name or not part_name or not subsystem_name:
148            continue
149        module_config_file = \
150            os.path.join(os.path.dirname(info_file), module_name)
151        is_append = True
152        if subsystem_list or part_list:
153            if part_name not in part_list and \
154                    subsystem_name not in subsystem_list:
155                is_append = False
156        if is_append:
157            getattr(config, ConfigConst.component_mapper, dict()).update(
158                {module_name: (subsystem_name, part_name)})
159            test_sources.append(module_config_file)
160
161
162def _get_test_sources(config, testcases_dirs):
163    test_sources = []
164
165    # get test sources from testcases_dirs
166    if not config.testfile and not config.testlist and not config.testcase \
167            and not config.subsystems and not config.parts and not \
168            getattr(config, ConfigConst.component_base_kit, "") and\
169            config.task:
170        for testcases_dir in testcases_dirs:
171            _append_module_test_source(testcases_dir, test_sources)
172        return test_sources
173
174    # get test sources from config.testlist
175    if getattr(config, ConfigConst.testlist, ""):
176        for test_source in config.testlist.split(";"):
177            if test_source.strip():
178                test_sources.append(test_source.strip())
179        return test_sources
180
181    # get test sources from config.testfile
182    if getattr(config, ConfigConst.testfile, ""):
183        test_file = _get_test_file(config, testcases_dirs)
184        flags = os.O_RDONLY
185        modes = stat.S_IWUSR | stat.S_IRUSR
186        with os.fdopen(os.open(test_file, flags, modes), "r") as file_content:
187            for line in file_content:
188                if line.strip():
189                    test_sources.append(line.strip())
190
191    # get test sources from config.testcase
192    if getattr(config, ConfigConst.testcase, ""):
193        for test_source in config.testcase.split(";"):
194            if test_source.strip():
195                test_sources.append(test_source.strip())
196        return test_sources
197
198    # get test sources according *.moduleInfo file
199    if getattr(config, ConfigConst.subsystems, []) or getattr(
200            config, ConfigConst.parts, []) or \
201            getattr(config, ConfigConst.component_base_kit, ""):
202        setattr(config, ConfigConst.component_mapper, dict())
203        for testcases_dir in testcases_dirs:
204            _append_component_test_source(config, testcases_dir, test_sources)
205        return test_sources
206    return test_sources
207
208
209def _append_module_test_source(testcases_path, test_sources):
210    if not os.path.isdir(testcases_path):
211        return
212    for item in os.listdir(testcases_path):
213        item_path = os.path.join(testcases_path, item)
214        if os.path.isfile(item_path) and item_path.endswith(
215                MODULE_CONFIG_SUFFIX):
216            test_sources.append(item_path)
217
218
219def _get_test_file(config, testcases_dirs):
220    if os.path.isabs(config.testfile):
221        if os.path.exists(config.testfile):
222            return config.testfile
223        else:
224            raise ParamError("test file '%s' not exists" % config.testfile,
225                             error_no="00110")
226
227    for testcases_dir in testcases_dirs:
228        test_file = os.path.join(testcases_dir, config.testfile)
229        if os.path.exists(test_file):
230            return test_file
231
232    raise ParamError("test file '%s' not exists" % config.testfile)
233
234
235def _normalize_test_sources(testcases_dirs, test_sources, config):
236    norm_test_sources = []
237    for test_source in test_sources:
238        append_result = False
239        for testcases_dir in testcases_dirs:
240            # append test source absolute path
241            append_result = _append_norm_test_source(
242                norm_test_sources, test_source, testcases_dir, config)
243            if append_result:
244                break
245
246        # append test source if no corresponding file founded
247        if not append_result:
248            norm_test_sources.append(test_source)
249    if not norm_test_sources:
250        raise ParamError("test source not found")
251    return norm_test_sources
252
253
254def _append_norm_test_source(norm_test_sources, test_source, testcases_dir,
255                             config):
256    # get norm_test_source
257    norm_test_source = test_source
258    if not os.path.isabs(test_source):
259        norm_test_source = os.path.abspath(
260            os.path.join(testcases_dir, test_source))
261
262    # find py or pyd for test case input
263    if config.testcase and not config.testlist:
264        if os.path.isfile("%s%s" % (norm_test_source, PY_SUFFIX)):
265            norm_test_sources.append(
266                "%s%s" % (norm_test_source, PY_SUFFIX))
267            return True
268        elif os.path.isfile("%s%s" % (norm_test_source, PYD_SUFFIX)):
269            norm_test_sources.append(
270                "%s%s" % (norm_test_source, PYD_SUFFIX))
271            return True
272        return False
273
274    # append to norm_test_sources
275    if os.path.isfile(norm_test_source):
276        norm_test_sources.append(norm_test_source)
277        return True
278    elif os.path.isfile("%s%s" % (norm_test_source, MODULE_CONFIG_SUFFIX)):
279        norm_test_sources.append("%s%s" % (norm_test_source,
280                                           MODULE_CONFIG_SUFFIX))
281        return True
282    return False
283
284
285def _make_test_descriptor(file_path, test_type_key):
286    from _core.executor.request import Descriptor
287    if test_type_key is None:
288        return None
289
290    # get params
291    filename, _ = get_filename_extension(file_path)
292    uid = unique_id("TestSource", filename)
293    test_type = TestDictSource.test_type[test_type_key]
294    config_file = _get_config_file(
295        os.path.join(os.path.dirname(file_path), filename))
296
297    module_name = _parse_module_name(config_file, filename)
298    # make test descriptor
299    desc = Descriptor(uuid=uid, name=filename,
300                      source=TestSource(file_path, "", config_file, filename,
301                                        test_type, module_name))
302    return desc
303
304
305def _get_test_driver(test_source):
306    try:
307        from _core.testkit.json_parser import JsonParser
308        json_config = JsonParser(test_source)
309        return json_config.get_driver_type()
310    except ParamError as error:
311        LOG.error(error, error_no=error.error_no)
312        return ""
313
314
315def _make_test_descriptors_from_testsources(test_sources, config):
316    test_descriptors = []
317
318    for test_source in test_sources:
319        filename, ext = test_source.split()[0], "str"
320        if os.path.isfile(test_source):
321            filename, ext = get_filename_extension(test_source)
322
323        test_driver = config.testdriver
324        if is_config_str(test_source):
325            test_driver = _get_test_driver(test_source)
326
327        # get params
328        config_file = _get_config_file(
329            os.path.join(os.path.dirname(test_source), filename), ext, config)
330        test_type = _get_test_type(config_file, test_driver, ext)
331
332        desc = _create_descriptor(config_file, filename, test_source,
333                                  test_type, config)
334        if desc:
335            test_descriptors.append(desc)
336
337    return test_descriptors
338
339
340def _create_descriptor(config_file, filename, test_source, test_type, config):
341    from xdevice import Scheduler
342    from _core.executor.request import Descriptor
343
344    error_message = ""
345    if not test_type:
346        error_message = "no driver to execute '%s'" % test_source
347        LOG.error(error_message, error_no="00112")
348        if Scheduler.mode != ModeType.decc:
349            return None
350
351    # create Descriptor
352    uid = unique_id("TestSource", filename)
353    module_name = _parse_module_name(config_file, filename)
354    desc = Descriptor(uuid=uid, name=filename,
355                      source=TestSource(test_source, "", config_file,
356                                        filename, test_type, module_name))
357    if not os.path.isfile(test_source):
358        if is_config_str(test_source):
359            desc = Descriptor(uuid=uid, name=filename,
360                              source=TestSource("", test_source, config_file,
361                                                filename, test_type,
362                                                module_name))
363        else:
364            if config.testcase and not config.testlist:
365                error_message = "test case '%s' or '%s' not exists" % (
366                        "%s%s" % (test_source, PY_SUFFIX), "%s%s" % (
367                            test_source, PYD_SUFFIX))
368                error_no = "00103"
369            else:
370                error_message = "test source '%s' or '%s' not exists" % (
371                    test_source, "%s%s" % (test_source, MODULE_CONFIG_SUFFIX))
372                error_no = "00102"
373            if Scheduler.mode != ModeType.decc:
374                raise ParamError(error_message, error_no=error_no)
375
376    if Scheduler.mode == ModeType.decc and error_message:
377        Scheduler.report_not_executed(config.report_path, [("", desc)],
378                                      error_message)
379        return None
380
381    return desc
382
383
384def _get_config_file(filename, ext=None, config=None):
385    config_file = None
386    if os.path.exists("%s%s" % (filename, MODULE_CONFIG_SUFFIX)):
387        config_file = "%s%s" % (filename, MODULE_CONFIG_SUFFIX)
388        return config_file
389    if ext and os.path.exists("%s%s%s" % (filename, ext,
390                                          MODULE_CONFIG_SUFFIX)):
391        config_file = "%s%s%s" % (filename, ext, MODULE_CONFIG_SUFFIX)
392        return config_file
393    if config and getattr(config, "testcase", "") and not getattr(
394            config, "testlist"):
395        return _get_testcase_config_file(filename)
396
397    return config_file
398
399
400def _get_testcase_config_file(filename):
401    depth = 1
402    dirname = os.path.dirname(filename)
403    while dirname and depth < MAX_DIR_DEPTH:
404        for item in os.listdir(dirname):
405            item_path = os.path.join(dirname, item)
406            if os.path.isfile(item_path) and item.endswith(
407                    MODULE_CONFIG_SUFFIX):
408                return item_path
409        depth += 1
410        dirname = os.path.dirname(dirname)
411    return None
412
413
414def _get_component_info_file(entry_dir):
415    module_files = []
416    if not os.path.isdir(entry_dir):
417        return module_files
418    for item in os.listdir(entry_dir):
419        item_path = os.path.join(entry_dir, item)
420        if os.path.isfile(item_path) and item_path.endswith(
421                MODULE_INFO_SUFFIX):
422            module_files.append(item_path)
423    return module_files
424
425
426def _get_test_type(config_file, test_driver, ext):
427    if test_driver:
428        return test_driver
429
430    if config_file:
431        if not os.path.exists(config_file):
432            LOG.error("config file '%s' not exists" % config_file,
433                      error_no="00110")
434            return ""
435        return _get_test_driver(config_file)
436    if ext in [".py", ".js", ".dex", ".hap", ".bin"] \
437            and ext in TestDictSource.exe_type.keys():
438        test_type = TestDictSource.exe_type[ext]
439    elif ext in TestDictSource.exe_type.keys():
440        test_type = DeviceTestType.hap_test
441    else:
442        test_type = DeviceTestType.cpp_test
443    return test_type
444
445
446def _parse_module_name(config_file, file_name):
447    if config_file:
448        return get_filename_extension(config_file)[0]
449    else:
450        if "{" in file_name:
451            return "report"
452        return file_name
453
454
455class TestDictSource:
456    exe_type = dict()
457    test_type = dict()
458
459    @classmethod
460    def reset(cls):
461        cls.test_type = copy.deepcopy(TEST_TYPE_DICT)
462        cls.exe_type = copy.deepcopy(EXT_TYPE_DICT)
463