• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import json
21import copy
22import stat
23from collections import namedtuple
24
25from _core.constants import DeviceTestType
26from _core.constants import ModeType
27from _core.constants import HostDrivenTestType
28from _core.constants import FilePermission
29from _core.constants import ConfigConst
30from _core.exception import ParamError
31from _core.logger import platform_logger
32from _core.utils import get_filename_extension
33from _core.utils import is_config_str
34from _core.utils import unique_id
35
36__all__ = ["TestSetSource", "TestSource", "find_test_descriptors",
37           "find_testdict_descriptors", "TestDictSource"]
38
39TestSetSource = namedtuple('TestSetSource', 'set')
40TestSource = namedtuple('TestSource', 'source_file source_string config_file '
41                                      'test_name test_type module_name')
42
43TEST_TYPE_DICT = {"DEX": DeviceTestType.dex_test,
44                  "HAP": DeviceTestType.hap_test,
45                  "APK": DeviceTestType.hap_test,
46                  "PYT": HostDrivenTestType.device_test,
47                  "JST": DeviceTestType.jsunit_test,
48                  "OHJST": DeviceTestType.oh_jsunit_test,
49                  "CXX": DeviceTestType.cpp_test,
50                  "BIN": DeviceTestType.lite_cpp_test,
51                  "LTPPosix": DeviceTestType.ltp_posix_test}
52EXT_TYPE_DICT = {".dex": DeviceTestType.dex_test,
53                 ".hap": DeviceTestType.hap_test,
54                 ".apk": DeviceTestType.hap_test,
55                 ".py": HostDrivenTestType.device_test,
56                 ".js": DeviceTestType.jsunit_test,
57                 ".bin": DeviceTestType.lite_cpp_test,
58                 "default": DeviceTestType.cpp_test}
59PY_SUFFIX = ".py"
60PYD_SUFFIX = ".pyd"
61MODULE_CONFIG_SUFFIX = ".json"
62MODULE_INFO_SUFFIX = ".moduleInfo"
63MAX_DIR_DEPTH = 6
64LOG = platform_logger("TestSource")
65
66
67def find_test_descriptors(config):
68    if not config.testfile and not config.testlist and not config.task and \
69            not config.testcase and not config.subsystems and \
70            not config.parts:
71        return None
72
73    # get test sources
74    testcases_dirs = _get_testcases_dirs(config)
75    test_sources = _get_test_sources(config, testcases_dirs)
76    LOG.debug("Test sources: %s", test_sources)
77
78    # normalize test sources
79    test_sources = _normalize_test_sources(testcases_dirs, test_sources,
80                                           config)
81
82    # make test descriptors
83    test_descriptors = _make_test_descriptors_from_testsources(test_sources,
84                                                               config)
85    return test_descriptors
86
87
88def _get_testcases_dirs(config):
89    from xdevice import Variables
90    # add config.testcases_path and its subfolders
91    testcases_dirs = []
92    if getattr(config, ConfigConst.testcases_path, ""):
93        testcases_dirs = [config.testcases_path]
94        _append_subfolders(config.testcases_path, testcases_dirs)
95
96    # add inner testcases dir and its subfolders
97    inner_testcases_dir = os.path.abspath(os.path.join(
98        Variables.top_dir, "testcases"))
99    if getattr(config, ConfigConst.testcases_path, "") and os.path.normcase(
100            config.testcases_path) != os.path.normcase(inner_testcases_dir):
101        testcases_dirs.append(inner_testcases_dir)
102        _append_subfolders(inner_testcases_dir, testcases_dirs)
103
104    # add execution dir and top dir
105    testcases_dirs.append(Variables.exec_dir)
106    if os.path.normcase(Variables.exec_dir) != os.path.normcase(
107            Variables.top_dir):
108        testcases_dirs.append(Variables.top_dir)
109
110    LOG.debug("Testcases directories: %s", testcases_dirs)
111    return testcases_dirs
112
113
114def _append_subfolders(testcases_path, testcases_dirs):
115    for root, dirs, _ in os.walk(testcases_path):
116        for sub_dir in dirs:
117            testcases_dirs.append(os.path.abspath(os.path.join(root, sub_dir)))
118
119
120def find_testdict_descriptors(config):
121    from xdevice import Variables
122    if getattr(config, ConfigConst.testdict, "") == "":
123        return None
124    testdict = config.testdict
125    test_descriptors = []
126    for test_type_key, files in testdict.items():
127        for file_name in files:
128            if not os.path.isabs(file_name):
129                file_name = os.path.join(Variables.exec_dir, file_name)
130            if os.path.isfile(file_name) and test_type_key in \
131                    TestDictSource.test_type.keys():
132                desc = _make_test_descriptor(os.path.abspath(file_name),
133                                             test_type_key)
134                if desc is not None:
135                    test_descriptors.append(desc)
136    if not test_descriptors:
137        raise ParamError("test source is none", error_no="00110")
138    return test_descriptors
139
140
141def _append_component_test_source(config, testcases_dir, test_sources):
142    subsystem_list = config.subsystems if config.subsystems else list()
143    part_list = config.parts if config.parts else list()
144    module_info_files = _get_component_info_file(testcases_dir)
145    result_dict = dict()
146    for info_file in module_info_files:
147        flags = os.O_RDONLY
148        modes = stat.S_IWUSR | stat.S_IRUSR
149        with os.fdopen(os.open(info_file, flags, modes), "r") as f_handler:
150            result_dict.update(json.load(f_handler))
151        module_name = result_dict.get("module", "")
152        part_name = result_dict.get("part", "")
153        subsystem_name = result_dict.get("subsystem", "")
154        if not module_name or not part_name or not subsystem_name:
155            continue
156        module_config_file = \
157            os.path.join(os.path.dirname(info_file), module_name)
158        is_append = True
159        if subsystem_list or part_list:
160            if part_name not in part_list and \
161                    subsystem_name not in subsystem_list:
162                is_append = False
163        if is_append:
164            getattr(config, ConfigConst.component_mapper, dict()).update(
165                {module_name: (subsystem_name, part_name)})
166            test_sources.append(module_config_file)
167
168
169def _get_test_sources(config, testcases_dirs):
170    test_sources = []
171
172    # get test sources from testcases_dirs
173    if not config.testfile and not config.testlist and not config.testcase \
174            and not config.subsystems and not config.parts and not \
175            getattr(config, ConfigConst.component_base_kit, "") and \
176            config.task:
177        for testcases_dir in testcases_dirs:
178            _append_module_test_source(testcases_dir, test_sources)
179        return test_sources
180
181        # get test sources from config.testlist
182    if getattr(config, ConfigConst.testlist, ""):
183        for test_source in config.testlist.split(";"):
184            if test_source.strip():
185                test_sources.append(test_source.strip())
186        return test_sources
187
188        # get test sources from config.testfile
189    if getattr(config, ConfigConst.testfile, ""):
190        test_file = _get_test_file(config, testcases_dirs)
191        flags = os.O_RDONLY
192        modes = stat.S_IWUSR | stat.S_IRUSR
193        with os.fdopen(os.open(test_file, flags, modes), "r") as file_content:
194            if str(test_file).endswith(".json"):
195                content = file_content.read()
196                source_list, case_dict = parse_source_from_data(content)
197                test_sources.extend(source_list)
198                config.tf_suite = case_dict
199            else:
200                for line in file_content:
201                    if line.strip():
202                        test_sources.append(line.strip())
203
204        # get test sources from config.testcase
205    if getattr(config, ConfigConst.testcase, ""):
206        for test_source in config.testcase.split(";"):
207            if test_source.strip():
208                test_sources.append(test_source.strip())
209        return test_sources
210
211    if getattr(config, ConfigConst.subsystems, []) or \
212            getattr(config, ConfigConst.parts, []) or \
213            getattr(config, ConfigConst.component_base_kit, ""):
214        setattr(config, ConfigConst.component_mapper, dict())
215        for testcases_dir in testcases_dirs:
216            _append_component_test_source(config, testcases_dir, test_sources)
217        return test_sources
218    return test_sources
219
220
221def _append_module_test_source(testcases_path, test_sources):
222    if not os.path.isdir(testcases_path):
223        return
224    for item in os.listdir(testcases_path):
225        item_path = os.path.join(testcases_path, item)
226        if os.path.isfile(item_path) and item_path.endswith(
227                MODULE_CONFIG_SUFFIX):
228            test_sources.append(item_path)
229
230
231def _get_test_file(config, testcases_dirs):
232    if os.path.isabs(config.testfile):
233        if os.path.exists(config.testfile):
234            return config.testfile
235        else:
236            raise ParamError("test file '%s' not exists" % config.testfile,
237                             error_no="00110")
238
239    for testcases_dir in testcases_dirs:
240        test_file = os.path.join(testcases_dir, config.testfile)
241        if os.path.exists(test_file):
242            return test_file
243
244    raise ParamError("test file '%s' not exists" % config.testfile)
245
246
247def _normalize_test_sources(testcases_dirs, test_sources, config):
248    norm_test_sources = []
249    for test_source in test_sources:
250        append_result = False
251        for testcases_dir in testcases_dirs:
252            # append test source absolute path
253            append_result = _append_norm_test_source(
254                norm_test_sources, test_source, testcases_dir, config)
255            if append_result:
256                break
257
258        # append test source if no corresponding file founded
259        if not append_result:
260            norm_test_sources.append(test_source)
261    if not norm_test_sources:
262        raise ParamError("test source not found")
263    return norm_test_sources
264
265
266def _append_norm_test_source(norm_test_sources, test_source, testcases_dir,
267                             config):
268    # get norm_test_source
269    norm_test_source = test_source
270    if not os.path.isabs(test_source):
271        norm_test_source = os.path.abspath(
272            os.path.join(testcases_dir, test_source))
273
274    # find py or pyd for test case input
275    if config.testcase and not config.testlist:
276        if os.path.isfile("%s%s" % (norm_test_source, PY_SUFFIX)):
277            norm_test_sources.append(
278                "%s%s" % (norm_test_source, PY_SUFFIX))
279            return True
280        elif os.path.isfile("%s%s" % (norm_test_source, PYD_SUFFIX)):
281            norm_test_sources.append(
282                "%s%s" % (norm_test_source, PYD_SUFFIX))
283            return True
284        return False
285
286    # append to norm_test_sources
287    if os.path.isfile(norm_test_source):
288        norm_test_sources.append(norm_test_source)
289        return True
290    elif os.path.isfile("%s%s" % (norm_test_source, MODULE_CONFIG_SUFFIX)):
291        norm_test_sources.append("%s%s" % (norm_test_source,
292                                           MODULE_CONFIG_SUFFIX))
293        return True
294    return False
295
296
297def _make_test_descriptor(file_path, test_type_key):
298    from _core.executor.request import Descriptor
299    if test_type_key is None:
300        return None
301
302    # get params
303    filename, _ = get_filename_extension(file_path)
304    uid = unique_id("TestSource", filename)
305    test_type = TestDictSource.test_type[test_type_key]
306    config_file = _get_config_file(
307        os.path.join(os.path.dirname(file_path), filename))
308
309    module_name = _parse_module_name(config_file, filename)
310    # make test descriptor
311    desc = Descriptor(uuid=uid, name=filename,
312                      source=TestSource(file_path, "", config_file, filename,
313                                        test_type, module_name))
314    return desc
315
316
317def _get_test_driver(test_source):
318    try:
319        from _core.testkit.json_parser import JsonParser
320        json_config = JsonParser(test_source)
321        return json_config.get_driver_type()
322    except ParamError as error:
323        LOG.error(error, error_no=error.error_no)
324        return ""
325
326
327def _make_test_descriptors_from_testsources(test_sources, config):
328    test_descriptors = []
329
330    for test_source in test_sources:
331        filename, ext = test_source.split()[0], "str"
332        if os.path.isfile(test_source):
333            filename, ext = get_filename_extension(test_source)
334
335        test_driver = config.testdriver
336        if is_config_str(test_source):
337            test_driver = _get_test_driver(test_source)
338
339        # get params
340        config_file = _get_config_file(
341            os.path.join(os.path.dirname(test_source), filename), ext, config)
342        test_type = _get_test_type(config_file, test_driver, ext)
343        if not config_file:
344            if getattr(config, ConfigConst.testcase, "") and not \
345                    getattr(config, ConfigConst.testlist):
346                LOG.debug("Can't find the json file of config")
347                from xdevice import Scheduler
348                if Scheduler.device_labels:
349                    config_file, test_type = _generate_config_file(
350                        Scheduler.device_labels,
351                        os.path.join(os.path.dirname(test_source), filename),
352                        ext, test_type)
353                    setattr(Scheduler, "tmp_json", config_file)
354                    LOG.debug("Generate temp json success: %s" % config_file)
355        desc = _create_descriptor(config_file, filename, test_source,
356                                  test_type, config)
357        if desc:
358            test_descriptors.append(desc)
359
360    return test_descriptors
361
362
363def _create_descriptor(config_file, filename, test_source, test_type, config):
364    from xdevice import Scheduler
365    from _core.executor.request import Descriptor
366
367    error_message = ""
368    if not test_type:
369        error_message = "no driver to execute '%s'" % test_source
370        LOG.error(error_message, error_no="00112")
371        if Scheduler.mode != ModeType.decc:
372            return None
373
374    # create Descriptor
375    uid = unique_id("TestSource", filename)
376    module_name = _parse_module_name(config_file, filename)
377    desc = Descriptor(uuid=uid, name=filename,
378                      source=TestSource(test_source, "", config_file,
379                                        filename, test_type, module_name))
380    if not os.path.isfile(test_source):
381        if is_config_str(test_source):
382            desc = Descriptor(uuid=uid, name=filename,
383                              source=TestSource("", test_source, config_file,
384                                                filename, test_type,
385                                                module_name))
386        else:
387            if config.testcase and not config.testlist:
388                error_message = "test case '%s' or '%s' not exists" % (
389                        "%s%s" % (test_source, PY_SUFFIX), "%s%s" % (
390                            test_source, PYD_SUFFIX))
391                error_no = "00103"
392            else:
393                error_message = "test source '%s' or '%s' not exists" % (
394                    test_source, "%s%s" % (test_source, MODULE_CONFIG_SUFFIX))
395                error_no = "00102"
396            desc.error = ParamError(error_message, error_no=error_no)
397
398    if Scheduler.mode == ModeType.decc and error_message:
399        Scheduler.report_not_executed(config.report_path, [("", desc)],
400                                      error_message)
401        return None
402
403    return desc
404
405
406def _get_config_file(filename, ext=None, config=None):
407    config_file = None
408    if os.path.exists("%s%s" % (filename, MODULE_CONFIG_SUFFIX)):
409        config_file = "%s%s" % (filename, MODULE_CONFIG_SUFFIX)
410        return config_file
411    if ext and os.path.exists("%s%s%s" % (filename, ext,
412                                          MODULE_CONFIG_SUFFIX)):
413        config_file = "%s%s%s" % (filename, ext, MODULE_CONFIG_SUFFIX)
414        return config_file
415    if config and getattr(config, "testcase", "") and not getattr(
416            config, "testlist"):
417        return _get_testcase_config_file(filename)
418
419    return config_file
420
421
422def _get_testcase_config_file(filename):
423    depth = 1
424    dirname = os.path.dirname(filename)
425    while dirname and depth < MAX_DIR_DEPTH:
426        for item in os.listdir(dirname):
427            item_path = os.path.join(dirname, item)
428            if os.path.isfile(item_path) and item.endswith(
429                    MODULE_CONFIG_SUFFIX):
430                return item_path
431        depth += 1
432        dirname = os.path.dirname(dirname)
433    return None
434
435
436def _get_component_info_file(entry_dir):
437    module_files = []
438    if not os.path.isdir(entry_dir):
439        return module_files
440    for item in os.listdir(entry_dir):
441        item_path = os.path.join(entry_dir, item)
442        if os.path.isfile(item_path) and item_path.endswith(
443                MODULE_INFO_SUFFIX):
444            module_files.append(item_path)
445    return module_files
446
447
448def _get_test_type(config_file, test_driver, ext):
449    if test_driver:
450        return test_driver
451
452    if config_file:
453        if not os.path.exists(config_file):
454            LOG.error("Config file '%s' not exists" % config_file,
455                      error_no="00110")
456            return ""
457        return _get_test_driver(config_file)
458    if ext in [".py", ".js", ".dex", ".hap", ".bin"] \
459            and ext in TestDictSource.exe_type.keys():
460        test_type = TestDictSource.exe_type[ext]
461    elif ext in [".apk"] and ext in TestDictSource.exe_type.keys():
462        test_type = DeviceTestType.hap_test
463    else:
464        test_type = DeviceTestType.cpp_test
465    return test_type
466
467
468def _parse_module_name(config_file, file_name):
469    if config_file:
470        return get_filename_extension(config_file)[0]
471    else:
472        if "{" in file_name:
473            return "report"
474        return file_name
475
476
477def _generate_config_file(device_labels, filename, ext, test_type):
478    if test_type not in [HostDrivenTestType.device_test]:
479        test_type = HostDrivenTestType.device_test
480    top_dict = {"environment": [], "driver": {"type": test_type,
481                "py_file": "%s%s" % (filename, ext)}}
482    for label in device_labels:
483        device_json_list = top_dict.get("environment")
484        device_json_list.append({"type": "device", "label": label})
485
486    save_file = os.path.join(os.path.dirname(filename),
487                             "%s.json" % os.path.basename(filename))
488    save_file_open = \
489        os.open(save_file, os.O_WRONLY | os.O_CREAT, FilePermission.mode_755)
490    with os.fdopen(save_file_open, "w") as save_handler:
491        save_handler.write(json.dumps(top_dict, indent=4))
492    return save_file, test_type
493
494
495def parse_source_from_data(content):
496    source_list = list()
497    data = dict(json.loads(content))
498    case_dict = dict()
499    for item in data.get("suite", list()):
500        name = item.get("module_name", "")
501        case_dict.update({name: item})
502        source_list.append(name)
503    return source_list, case_dict
504
505class TestDictSource:
506    exe_type = copy.deepcopy(EXT_TYPE_DICT)
507    test_type = copy.deepcopy(TEST_TYPE_DICT)
508
509    @classmethod
510    def reset(cls):
511        cls.test_type = copy.deepcopy(TEST_TYPE_DICT)
512        cls.exe_type = copy.deepcopy(EXT_TYPE_DICT)
513
514    @classmethod
515    def clear(cls):
516        cls.test_type.clear()
517        cls.exe_type.clear()
518