• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import json
21import copy
22import stat
23from collections import namedtuple
24
25from _core.constants import DeviceTestType
26from _core.constants import ModeType
27from _core.constants import HostDrivenTestType
28from _core.constants import FilePermission
29from _core.constants import ConfigConst
30from _core.exception import ParamError
31from _core.logger import platform_logger
32from _core.utils import get_filename_extension
33from _core.utils import is_config_str
34from _core.utils import unique_id
35
36__all__ = ["TestSetSource", "TestSource", "find_test_descriptors",
37           "find_testdict_descriptors", "TestDictSource"]
38
39TestSetSource = namedtuple('TestSetSource', 'set')
40TestSource = namedtuple('TestSource', 'source_file source_string config_file '
41                                      'test_name test_type module_name')
42
43TEST_TYPE_DICT = {"DEX": DeviceTestType.dex_test,
44                  "HAP": DeviceTestType.hap_test,
45                  "APK": DeviceTestType.hap_test,
46                  "PYT": HostDrivenTestType.device_test,
47                  "JST": DeviceTestType.jsunit_test,
48                  "OHJST": DeviceTestType.oh_jsunit_test,
49                  "CXX": DeviceTestType.cpp_test,
50                  "BIN": DeviceTestType.lite_cpp_test}
51EXT_TYPE_DICT = {".dex": DeviceTestType.dex_test,
52                 ".hap": DeviceTestType.hap_test,
53                 ".apk": DeviceTestType.hap_test,
54                 ".py": HostDrivenTestType.device_test,
55                 ".js": DeviceTestType.jsunit_test,
56                 ".bin": DeviceTestType.lite_cpp_test,
57                 "default": DeviceTestType.cpp_test}
58PY_SUFFIX = ".py"
59PYD_SUFFIX = ".pyd"
60MODULE_CONFIG_SUFFIX = ".json"
61MODULE_INFO_SUFFIX = ".moduleInfo"
62MAX_DIR_DEPTH = 6
63LOG = platform_logger("TestSource")
64
65
66def find_test_descriptors(config):
67    if not config.testfile and not config.testlist and not config.task and \
68            not config.testcase and not config.subsystems and \
69            not config.parts:
70        return None
71
72    # get test sources
73    testcases_dirs = _get_testcases_dirs(config)
74    test_sources = _get_test_sources(config, testcases_dirs)
75    LOG.debug("Test sources: %s", test_sources)
76
77    # normalize test sources
78    test_sources = _normalize_test_sources(testcases_dirs, test_sources,
79                                           config)
80
81    # make test descriptors
82    test_descriptors = _make_test_descriptors_from_testsources(test_sources,
83                                                               config)
84    return test_descriptors
85
86
87def _get_testcases_dirs(config):
88    from xdevice import Variables
89    # add config.testcases_path and its subfolders
90    testcases_dirs = []
91    if getattr(config, ConfigConst.testcases_path, ""):
92        testcases_dirs = [config.testcases_path]
93        _append_subfolders(config.testcases_path, testcases_dirs)
94
95    # add inner testcases dir and its subfolders
96    inner_testcases_dir = os.path.abspath(os.path.join(
97        Variables.top_dir, "testcases"))
98    if getattr(config, ConfigConst.testcases_path, "") and os.path.normcase(
99            config.testcases_path) != os.path.normcase(inner_testcases_dir):
100        testcases_dirs.append(inner_testcases_dir)
101        _append_subfolders(inner_testcases_dir, testcases_dirs)
102
103    # add execution dir and top dir
104    testcases_dirs.append(Variables.exec_dir)
105    if os.path.normcase(Variables.exec_dir) != os.path.normcase(
106            Variables.top_dir):
107        testcases_dirs.append(Variables.top_dir)
108
109    LOG.debug("Testcases directories: %s", testcases_dirs)
110    return testcases_dirs
111
112
113def _append_subfolders(testcases_path, testcases_dirs):
114    for root, dirs, _ in os.walk(testcases_path):
115        for sub_dir in dirs:
116            testcases_dirs.append(os.path.abspath(os.path.join(root, sub_dir)))
117
118
119def find_testdict_descriptors(config):
120    from xdevice import Variables
121    if getattr(config, ConfigConst.testdict, "") == "":
122        return None
123    testdict = config.testdict
124    test_descriptors = []
125    for test_type_key, files in testdict.items():
126        for file_name in files:
127            if not os.path.isabs(file_name):
128                file_name = os.path.join(Variables.exec_dir, file_name)
129            if os.path.isfile(file_name) and test_type_key in \
130                    TestDictSource.test_type.keys():
131                desc = _make_test_descriptor(os.path.abspath(file_name),
132                                             test_type_key)
133                if desc is not None:
134                    test_descriptors.append(desc)
135    if not test_descriptors:
136        raise ParamError("test source is none", error_no="00110")
137    return test_descriptors
138
139
140def _append_component_test_source(config, testcases_dir, test_sources):
141    subsystem_list = config.subsystems if config.subsystems else list()
142    part_list = config.parts if config.parts else list()
143    module_info_files = _get_component_info_file(testcases_dir)
144    result_dict = dict()
145    for info_file in module_info_files:
146        flags = os.O_RDONLY
147        modes = stat.S_IWUSR | stat.S_IRUSR
148        with os.fdopen(os.open(info_file, flags, modes), "r") as f_handler:
149            result_dict.update(json.load(f_handler))
150        module_name = result_dict.get("module", "")
151        part_name = result_dict.get("part", "")
152        subsystem_name = result_dict.get("subsystem", "")
153        if not module_name or not part_name or not subsystem_name:
154            continue
155        module_config_file = \
156            os.path.join(os.path.dirname(info_file), module_name)
157        is_append = True
158        if subsystem_list or part_list:
159            if part_name not in part_list and \
160                    subsystem_name not in subsystem_list:
161                is_append = False
162        if is_append:
163            getattr(config, ConfigConst.component_mapper, dict()).update(
164                {module_name: (subsystem_name, part_name)})
165            test_sources.append(module_config_file)
166
167
168def _get_test_sources(config, testcases_dirs):
169    test_sources = []
170
171    # get test sources from testcases_dirs
172    if not config.testfile and not config.testlist and not config.testcase \
173            and not config.subsystems and not config.parts and not \
174            getattr(config, ConfigConst.component_base_kit, "") and \
175            config.task:
176        for testcases_dir in testcases_dirs:
177            _append_module_test_source(testcases_dir, test_sources)
178        return test_sources
179
180        # get test sources from config.testlist
181    if getattr(config, ConfigConst.testlist, ""):
182        for test_source in config.testlist.split(";"):
183            if test_source.strip():
184                test_sources.append(test_source.strip())
185        return test_sources
186
187        # get test sources from config.testfile
188    if getattr(config, ConfigConst.testfile, ""):
189        test_file = _get_test_file(config, testcases_dirs)
190        flags = os.O_RDONLY
191        modes = stat.S_IWUSR | stat.S_IRUSR
192        with os.fdopen(os.open(test_file, flags, modes), "r") as file_content:
193            for line in file_content:
194                if line.strip():
195                    test_sources.append(line.strip())
196
197        # get test sources from config.testcase
198    if getattr(config, ConfigConst.testcase, ""):
199        for test_source in config.testcase.split(";"):
200            if test_source.strip():
201                test_sources.append(test_source.strip())
202        return test_sources
203
204    if getattr(config, ConfigConst.subsystems, []) or \
205            getattr(config, ConfigConst.parts, []) or \
206            getattr(config, ConfigConst.component_base_kit, ""):
207        setattr(config, ConfigConst.component_mapper, dict())
208        for testcases_dir in testcases_dirs:
209            _append_component_test_source(config, testcases_dir, test_sources)
210        return test_sources
211    return test_sources
212
213
214def _append_module_test_source(testcases_path, test_sources):
215    if not os.path.isdir(testcases_path):
216        return
217    for item in os.listdir(testcases_path):
218        item_path = os.path.join(testcases_path, item)
219        if os.path.isfile(item_path) and item_path.endswith(
220                MODULE_CONFIG_SUFFIX):
221            test_sources.append(item_path)
222
223
224def _get_test_file(config, testcases_dirs):
225    if os.path.isabs(config.testfile):
226        if os.path.exists(config.testfile):
227            return config.testfile
228        else:
229            raise ParamError("test file '%s' not exists" % config.testfile,
230                             error_no="00110")
231
232    for testcases_dir in testcases_dirs:
233        test_file = os.path.join(testcases_dir, config.testfile)
234        if os.path.exists(test_file):
235            return test_file
236
237    raise ParamError("test file '%s' not exists" % config.testfile)
238
239
240def _normalize_test_sources(testcases_dirs, test_sources, config):
241    norm_test_sources = []
242    for test_source in test_sources:
243        append_result = False
244        for testcases_dir in testcases_dirs:
245            # append test source absolute path
246            append_result = _append_norm_test_source(
247                norm_test_sources, test_source, testcases_dir, config)
248            if append_result:
249                break
250
251        # append test source if no corresponding file founded
252        if not append_result:
253            norm_test_sources.append(test_source)
254    if not norm_test_sources:
255        raise ParamError("test source not found")
256    return norm_test_sources
257
258
259def _append_norm_test_source(norm_test_sources, test_source, testcases_dir,
260                             config):
261    # get norm_test_source
262    norm_test_source = test_source
263    if not os.path.isabs(test_source):
264        norm_test_source = os.path.abspath(
265            os.path.join(testcases_dir, test_source))
266
267    # find py or pyd for test case input
268    if config.testcase and not config.testlist:
269        if os.path.isfile("%s%s" % (norm_test_source, PY_SUFFIX)):
270            norm_test_sources.append(
271                "%s%s" % (norm_test_source, PY_SUFFIX))
272            return True
273        elif os.path.isfile("%s%s" % (norm_test_source, PYD_SUFFIX)):
274            norm_test_sources.append(
275                "%s%s" % (norm_test_source, PYD_SUFFIX))
276            return True
277        return False
278
279    # append to norm_test_sources
280    if os.path.isfile(norm_test_source):
281        norm_test_sources.append(norm_test_source)
282        return True
283    elif os.path.isfile("%s%s" % (norm_test_source, MODULE_CONFIG_SUFFIX)):
284        norm_test_sources.append("%s%s" % (norm_test_source,
285                                           MODULE_CONFIG_SUFFIX))
286        return True
287    return False
288
289
290def _make_test_descriptor(file_path, test_type_key):
291    from _core.executor.request import Descriptor
292    if test_type_key is None:
293        return None
294
295    # get params
296    filename, _ = get_filename_extension(file_path)
297    uid = unique_id("TestSource", filename)
298    test_type = TestDictSource.test_type[test_type_key]
299    config_file = _get_config_file(
300        os.path.join(os.path.dirname(file_path), filename))
301
302    module_name = _parse_module_name(config_file, filename)
303    # make test descriptor
304    desc = Descriptor(uuid=uid, name=filename,
305                      source=TestSource(file_path, "", config_file, filename,
306                                        test_type, module_name))
307    return desc
308
309
310def _get_test_driver(test_source):
311    try:
312        from _core.testkit.json_parser import JsonParser
313        json_config = JsonParser(test_source)
314        return json_config.get_driver_type()
315    except ParamError as error:
316        LOG.error(error, error_no=error.error_no)
317        return ""
318
319
320def _make_test_descriptors_from_testsources(test_sources, config):
321    test_descriptors = []
322
323    for test_source in test_sources:
324        filename, ext = test_source.split()[0], "str"
325        if os.path.isfile(test_source):
326            filename, ext = get_filename_extension(test_source)
327
328        test_driver = config.testdriver
329        if is_config_str(test_source):
330            test_driver = _get_test_driver(test_source)
331
332        # get params
333        config_file = _get_config_file(
334            os.path.join(os.path.dirname(test_source), filename), ext, config)
335        test_type = _get_test_type(config_file, test_driver, ext)
336        if not config_file:
337            if getattr(config, ConfigConst.testcase, "") and not \
338                    getattr(config, ConfigConst.testlist):
339                LOG.debug("Can't find the json file of config")
340                from xdevice import Scheduler
341                if Scheduler.device_labels:
342                    config_file, test_type = _generate_config_file(
343                        Scheduler.device_labels,
344                        os.path.join(os.path.dirname(test_source), filename),
345                        ext, test_type)
346                    setattr(Scheduler, "tmp_json", config_file)
347                    LOG.debug("Generate temp json success: %s" % config_file)
348        desc = _create_descriptor(config_file, filename, test_source,
349                                  test_type, config)
350        if desc:
351            test_descriptors.append(desc)
352
353    return test_descriptors
354
355
356def _create_descriptor(config_file, filename, test_source, test_type, config):
357    from xdevice import Scheduler
358    from _core.executor.request import Descriptor
359
360    error_message = ""
361    if not test_type:
362        error_message = "no driver to execute '%s'" % test_source
363        LOG.error(error_message, error_no="00112")
364        if Scheduler.mode != ModeType.decc:
365            return None
366
367    # create Descriptor
368    uid = unique_id("TestSource", filename)
369    module_name = _parse_module_name(config_file, filename)
370    desc = Descriptor(uuid=uid, name=filename,
371                      source=TestSource(test_source, "", config_file,
372                                        filename, test_type, module_name))
373    if not os.path.isfile(test_source):
374        if is_config_str(test_source):
375            desc = Descriptor(uuid=uid, name=filename,
376                              source=TestSource("", test_source, config_file,
377                                                filename, test_type,
378                                                module_name))
379        else:
380            if config.testcase and not config.testlist:
381                error_message = "test case '%s' or '%s' not exists" % (
382                        "%s%s" % (test_source, PY_SUFFIX), "%s%s" % (
383                            test_source, PYD_SUFFIX))
384                error_no = "00103"
385            else:
386                error_message = "test source '%s' or '%s' not exists" % (
387                    test_source, "%s%s" % (test_source, MODULE_CONFIG_SUFFIX))
388                error_no = "00102"
389            if Scheduler.mode != ModeType.decc:
390                raise ParamError(error_message, error_no=error_no)
391
392    if Scheduler.mode == ModeType.decc and error_message:
393        Scheduler.report_not_executed(config.report_path, [("", desc)],
394                                      error_message)
395        return None
396
397    return desc
398
399
400def _get_config_file(filename, ext=None, config=None):
401    config_file = None
402    if os.path.exists("%s%s" % (filename, MODULE_CONFIG_SUFFIX)):
403        config_file = "%s%s" % (filename, MODULE_CONFIG_SUFFIX)
404        return config_file
405    if ext and os.path.exists("%s%s%s" % (filename, ext,
406                                          MODULE_CONFIG_SUFFIX)):
407        config_file = "%s%s%s" % (filename, ext, MODULE_CONFIG_SUFFIX)
408        return config_file
409    if config and getattr(config, "testcase", "") and not getattr(
410            config, "testlist"):
411        return _get_testcase_config_file(filename)
412
413    return config_file
414
415
416def _get_testcase_config_file(filename):
417    depth = 1
418    dirname = os.path.dirname(filename)
419    while dirname and depth < MAX_DIR_DEPTH:
420        for item in os.listdir(dirname):
421            item_path = os.path.join(dirname, item)
422            if os.path.isfile(item_path) and item.endswith(
423                    MODULE_CONFIG_SUFFIX):
424                return item_path
425        depth += 1
426        dirname = os.path.dirname(dirname)
427    return None
428
429
430def _get_component_info_file(entry_dir):
431    module_files = []
432    if not os.path.isdir(entry_dir):
433        return module_files
434    for item in os.listdir(entry_dir):
435        item_path = os.path.join(entry_dir, item)
436        if os.path.isfile(item_path) and item_path.endswith(
437                MODULE_INFO_SUFFIX):
438            module_files.append(item_path)
439    return module_files
440
441
442def _get_test_type(config_file, test_driver, ext):
443    if test_driver:
444        return test_driver
445
446    if config_file:
447        if not os.path.exists(config_file):
448            LOG.error("Config file '%s' not exists" % config_file,
449                      error_no="00110")
450            return ""
451        return _get_test_driver(config_file)
452    if ext in [".py", ".js", ".dex", ".hap", ".bin"] \
453            and ext in TestDictSource.exe_type.keys():
454        test_type = TestDictSource.exe_type[ext]
455    elif ext in [".apk"] and ext in TestDictSource.exe_type.keys():
456        test_type = DeviceTestType.hap_test
457    else:
458        test_type = DeviceTestType.cpp_test
459    return test_type
460
461
462def _parse_module_name(config_file, file_name):
463    if config_file:
464        return get_filename_extension(config_file)[0]
465    else:
466        if "{" in file_name:
467            return "report"
468        return file_name
469
470
471def _generate_config_file(device_labels, filename, ext, test_type):
472    if test_type not in [HostDrivenTestType.device_test]:
473        test_type = HostDrivenTestType.device_test
474    top_dict = {"environment": [], "driver": {"type": test_type,
475                "py_file": "%s%s" % (filename, ext)}}
476    for label in device_labels:
477        device_json_list = top_dict.get("environment")
478        device_json_list.append({"type": "device", "label": label})
479
480    save_file = os.path.join(os.path.dirname(filename),
481                             "%s.json" % os.path.basename(filename))
482    save_file_open = \
483        os.open(save_file, os.O_WRONLY | os.O_CREAT, FilePermission.mode_755)
484    with os.fdopen(save_file_open, "w") as save_handler:
485        save_handler.write(json.dumps(top_dict, indent=4))
486    return save_file, test_type
487
488
489class TestDictSource:
490    exe_type = dict()
491    test_type = dict()
492
493    @classmethod
494    def reset(cls):
495        cls.test_type = copy.deepcopy(TEST_TYPE_DICT)
496        cls.exe_type = copy.deepcopy(EXT_TYPE_DICT)
497