• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import json
21import copy
22import stat
23from collections import namedtuple
24
25from _core.constants import DeviceTestType
26from _core.constants import SchedulerType
27from _core.constants import ModeType
28from _core.constants import HostDrivenTestType
29from _core.constants import FilePermission
30from _core.constants import ConfigConst
31from _core.error import ErrorMessage
32from _core.exception import ParamError
33from _core.logger import platform_logger
34from _core.testkit.json_parser import JsonParser
35from _core.utils import get_filename_extension
36from _core.utils import is_config_str
37from _core.utils import unique_id
38
39from _core.context.handler import report_not_executed
40from _core.context.center import Context
41from _core.context.upload import Uploader
42
43__all__ = ["TestSetSource", "TestSource", "find_test_descriptors",
44           "find_testdict_descriptors", "TestDictSource"]
45
46TestSetSource = namedtuple('TestSetSource', 'set')
47TestSource = namedtuple('TestSource', 'source_file source_string config_file '
48                                      'test_name test_type module_name module_subsystem')
49
50TEST_TYPE_DICT = {"DEX": DeviceTestType.dex_test,
51                  "HAP": DeviceTestType.hap_test,
52                  "APK": DeviceTestType.hap_test,
53                  "PYT": HostDrivenTestType.device_test,
54                  "JST": DeviceTestType.jsunit_test,
55                  "OHJST": DeviceTestType.oh_jsunit_test,
56                  "CXX": DeviceTestType.cpp_test,
57                  "BIN": DeviceTestType.lite_cpp_test}
58EXT_TYPE_DICT = {".dex": DeviceTestType.dex_test,
59                 ".hap": DeviceTestType.hap_test,
60                 ".apk": DeviceTestType.hap_test,
61                 ".py": HostDrivenTestType.device_test,
62                 ".js": DeviceTestType.jsunit_test,
63                 ".bin": DeviceTestType.lite_cpp_test,
64                 "default": DeviceTestType.cpp_test}
65PY_SUFFIX = ".py"
66PYD_SUFFIX = ".pyd"
67MODULE_CONFIG_SUFFIX = ".json"
68MODULE_INFO_SUFFIX = ".moduleInfo"
69MAX_DIR_DEPTH = 6
70NO_MODULE_SUBSYSTEM = "zzzzzzzz"
71LOG = platform_logger("TestSource")
72
73
74def find_test_descriptors(config):
75    if not config.testfile and not config.testlist and not config.task and \
76            not config.testcase and not config.subsystems and \
77            not config.parts:
78        return None
79
80    # get test sources
81    testcases_dirs = _get_testcases_dirs(config)
82    test_sources = _get_test_sources(config, testcases_dirs)
83    LOG.debug("Test sources: %s", test_sources)
84
85    # normalize test sources
86    test_sources = _normalize_test_sources(testcases_dirs, test_sources,
87                                           config)
88
89    # make test descriptors
90    test_descriptors = _make_test_descriptors_from_testsources(test_sources,
91                                                               config)
92    return test_descriptors
93
94
95def _get_testcases_dirs(config):
96    from xdevice import Variables
97    # add config.testcases_path and its subfolders
98    testcases_dirs = []
99    if getattr(config, ConfigConst.testcases_path, ""):
100        testcases_dirs = [config.testcases_path]
101        _append_subfolders(config.testcases_path, testcases_dirs)
102
103    # add inner testcases dir and its subfolders
104    inner_testcases_dir = os.path.abspath(os.path.join(
105        Variables.top_dir, "testcases"))
106    if getattr(config, ConfigConst.testcases_path, "") and os.path.normcase(
107            config.testcases_path) != os.path.normcase(inner_testcases_dir):
108        testcases_dirs.append(inner_testcases_dir)
109        _append_subfolders(inner_testcases_dir, testcases_dirs)
110
111    # add execution dir and top dir
112    testcases_dirs.append(Variables.exec_dir)
113    if os.path.normcase(Variables.exec_dir) != os.path.normcase(
114            Variables.top_dir):
115        testcases_dirs.append(Variables.top_dir)
116
117    LOG.debug("Testcases directories: %s", testcases_dirs)
118    return testcases_dirs
119
120
121def _append_subfolders(testcases_path, testcases_dirs):
122    ignore_folders = ConfigConst.ignore_testcases_path.split("|")
123    for root, dirs, _ in os.walk(testcases_path):
124        dirs[:] = [d for d in dirs if d not in ignore_folders]
125        for sub_dir in dirs:
126            testcases_dirs.append(os.path.abspath(os.path.join(root, sub_dir)))
127
128
129def find_testdict_descriptors(config):
130    from xdevice import Variables
131    if getattr(config, ConfigConst.testdict, "") == "":
132        return None
133    testdict = config.testdict
134    test_descriptors = []
135    for test_type_key, files in testdict.items():
136        for file_name in files:
137            if not os.path.isabs(file_name):
138                file_name = os.path.join(Variables.exec_dir, file_name)
139            if os.path.isfile(file_name) and test_type_key in \
140                    TestDictSource.test_type.keys():
141                desc = _make_test_descriptor(os.path.abspath(file_name),
142                                             test_type_key)
143                if desc is not None:
144                    test_descriptors.append(desc)
145    if not test_descriptors:
146        raise ParamError(ErrorMessage.Common.Code_0101023)
147    return test_descriptors
148
149
150def _append_component_test_source(config, testcases_dir, test_sources):
151    subsystem_list = config.subsystems if config.subsystems else list()
152    part_list = config.parts if config.parts else list()
153    module_info_files = _get_component_info_file(testcases_dir)
154    result_dict = dict()
155    for info_file in module_info_files:
156        flags = os.O_RDONLY
157        modes = stat.S_IWUSR | stat.S_IRUSR
158        with os.fdopen(os.open(info_file, flags, modes), "r") as f_handler:
159            result_dict.update(json.load(f_handler))
160        module_name = result_dict.get("module", "")
161        part_name = result_dict.get("part", "")
162        subsystem_name = result_dict.get("subsystem", "")
163        if not module_name or not part_name or not subsystem_name:
164            continue
165        module_config_file = \
166            os.path.join(os.path.dirname(info_file), module_name)
167        is_append = True
168        if subsystem_list or part_list:
169            if part_name not in part_list and \
170                    subsystem_name not in subsystem_list:
171                is_append = False
172        if is_append:
173            getattr(config, ConfigConst.component_mapper, dict()).update(
174                {module_name: (subsystem_name, part_name)})
175            test_sources.append(module_config_file)
176
177
178def _get_test_sources(config, testcases_dirs):
179    test_sources = []
180
181    # get test sources from testcases_dirs
182    if not config.testfile and not config.testlist and not config.testcase \
183            and not config.subsystems and not config.parts and not \
184            getattr(config, ConfigConst.component_base_kit, "") and \
185            config.task:
186        for testcases_dir in testcases_dirs:
187            _append_module_test_source(testcases_dir, test_sources)
188        return test_sources
189
190        # get test sources from config.testlist
191    if getattr(config, ConfigConst.testlist, ""):
192        for test_source in config.testlist.split(";"):
193            if test_source.strip():
194                test_sources.append(test_source.strip())
195        return test_sources
196
197        # get test sources from config.testfile
198    if getattr(config, ConfigConst.testfile, ""):
199        test_file = _get_test_file(config, testcases_dirs)
200        flags = os.O_RDONLY
201        modes = stat.S_IWUSR | stat.S_IRUSR
202        with os.fdopen(os.open(test_file, flags, modes), "r") as file_content:
203            if str(test_file).endswith(".json"):
204                content = file_content.read()
205                source_list, case_dict = parse_source_from_data(content)
206                test_sources.extend(source_list)
207                config.tf_suite = case_dict
208            else:
209                for line in file_content:
210                    if line.strip():
211                        test_sources.append(line.strip())
212
213        # get test sources from config.testcase
214    if getattr(config, ConfigConst.testcase, ""):
215        for test_source in config.testcase.split(";"):
216            if test_source.strip():
217                test_sources.append(test_source.strip())
218        return test_sources
219
220    if getattr(config, ConfigConst.subsystems, []) or \
221            getattr(config, ConfigConst.parts, []) or \
222            getattr(config, ConfigConst.component_base_kit, ""):
223        setattr(config, ConfigConst.component_mapper, dict())
224        for testcases_dir in testcases_dirs:
225            _append_component_test_source(config, testcases_dir, test_sources)
226        return test_sources
227    return test_sources
228
229
230def _append_module_test_source(testcases_path, test_sources):
231    if not os.path.isdir(testcases_path):
232        return
233    for item in os.listdir(testcases_path):
234        if not item.endswith(MODULE_CONFIG_SUFFIX):
235            continue
236        item_path = os.path.join(testcases_path, item)
237        if os.path.isfile(item_path):
238            name = item[:-5]
239            if len(name) != len(name.strip()):
240                LOG.warning(f"test source '{item}' contains spaces at the beginning or end")
241            else:
242                test_sources.append(item_path)
243
244
245def _get_test_file(config, testcases_dirs):
246    if os.path.isabs(config.testfile):
247        if os.path.exists(config.testfile):
248            return config.testfile
249        else:
250            raise ParamError(ErrorMessage.Common.Code_0101024.format(config.testfile))
251
252    for testcases_dir in testcases_dirs:
253        test_file = os.path.join(testcases_dir, config.testfile)
254        if os.path.exists(test_file):
255            return test_file
256
257    raise ParamError(ErrorMessage.Common.Code_0101024.format(config.testfile))
258
259
260def _normalize_test_sources(testcases_dirs, test_sources, config):
261    norm_test_sources = []
262    for test_source in test_sources:
263        append_result = False
264        for testcases_dir in testcases_dirs:
265            # append test source absolute path
266            append_result = _append_norm_test_source(
267                norm_test_sources, test_source, testcases_dir, config)
268            if append_result:
269                break
270
271        # append test source if no corresponding file founded
272        if not append_result:
273            norm_test_sources.append(test_source)
274    if not norm_test_sources:
275        raise ParamError(ErrorMessage.Common.Code_0101023)
276    return norm_test_sources
277
278
279def _append_norm_test_source(norm_test_sources, test_source, testcases_dir,
280                             config):
281    # get norm_test_source
282    norm_test_source = test_source
283    if not os.path.isabs(test_source):
284        norm_test_source = os.path.abspath(
285            os.path.join(testcases_dir, test_source))
286
287    # find py or pyd for test case input
288    if config.testcase and not config.testlist:
289        if os.path.isfile("%s%s" % (norm_test_source, PY_SUFFIX)):
290            norm_test_sources.append(
291                "%s%s" % (norm_test_source, PY_SUFFIX))
292            return True
293        elif os.path.isfile("%s%s" % (norm_test_source, PYD_SUFFIX)):
294            norm_test_sources.append(
295                "%s%s" % (norm_test_source, PYD_SUFFIX))
296            return True
297        return False
298
299    # append to norm_test_sources
300    if os.path.isfile(norm_test_source):
301        norm_test_sources.append(norm_test_source)
302        return True
303    elif os.path.isfile("%s%s" % (norm_test_source, MODULE_CONFIG_SUFFIX)):
304        norm_test_sources.append("%s%s" % (norm_test_source,
305                                           MODULE_CONFIG_SUFFIX))
306        return True
307    return False
308
309
310def _make_test_descriptor(file_path, test_type_key):
311    from _core.executor.request import Descriptor
312    if test_type_key is None:
313        return None
314
315    # get params
316    filename, _ = get_filename_extension(file_path)
317    uid = unique_id("TestSource", filename)
318    test_type = TestDictSource.test_type.get(test_type_key)
319    config_file = _get_config_file(
320        os.path.join(os.path.dirname(file_path), filename))
321
322    module_info = ""
323    if config_file:
324        module_info = _get_module_info(config_file)
325
326    module_name = _parse_module_name(config_file, filename)
327    # make test descriptor
328    desc = Descriptor(uuid=uid, name=filename,
329                      source=TestSource(file_path, "", config_file, filename,
330                                        test_type, module_name, module_info))
331    return desc
332
333
334def _get_test_driver(test_source):
335    try:
336        json_config = JsonParser(test_source)
337        return json_config.get_driver_type()
338    except ParamError as error:
339        LOG.error(error, error_no=error.error_no)
340        return ""
341
342
343def _make_test_descriptors_from_testsources(test_sources, config):
344    test_descriptors = []
345
346    for test_source in test_sources:
347        filename, ext = test_source.split()[0], "str"
348        if os.path.isfile(test_source):
349            filename, ext = get_filename_extension(test_source)
350
351        test_driver = config.testdriver
352        if is_config_str(test_source):
353            test_driver = _get_test_driver(test_source)
354
355        # get params
356        config_file = _get_config_file(
357            os.path.join(os.path.dirname(test_source), filename), ext, config)
358        test_type = _get_test_type(config_file, test_driver, ext)
359        if not config_file:
360            if getattr(config, ConfigConst.testcase, "") and not \
361                    getattr(config, ConfigConst.testlist):
362                LOG.debug("Can't find the json file of config")
363                if Context.session().device_labels:
364                    config_file, test_type = _generate_config_file(
365                        Context.session().device_labels,
366                        os.path.join(os.path.dirname(test_source), filename),
367                        ext, test_type)
368                    setattr(Uploader, "tmp_json", config_file)
369                    LOG.debug("Generate temp json success: %s" % config_file)
370        module_info = NO_MODULE_SUBSYSTEM
371        if config.scheduler == SchedulerType.module and config_file:
372            module_info = _get_module_info(config_file)
373        desc = _create_descriptor(config_file, filename, test_source,
374                                  test_type, config, module_info)
375        if desc:
376            test_descriptors.append(desc)
377
378    return test_descriptors
379
380
381def _get_module_info(config_file):
382    module_info_path = config_file.replace(MODULE_CONFIG_SUFFIX, MODULE_INFO_SUFFIX)
383    if not os.path.exists(module_info_path):
384        return NO_MODULE_SUBSYSTEM
385    try:
386        from _core.testkit.json_parser import JsonParser
387        json_config = JsonParser(module_info_path)
388        return json_config.get_module_subsystem()
389    except ParamError as error:
390        LOG.warning(error, error_no=error.error_no)
391        return NO_MODULE_SUBSYSTEM
392
393
394def _create_descriptor(config_file, filename, test_source, test_type, config, module_info):
395    from _core.executor.request import Descriptor
396
397    error_message = ""
398    if not test_type:
399        error_message = "no driver to execute '%s'" % test_source
400        LOG.error(error_message, error_no="00112")
401        if Context.session().mode != ModeType.decc:
402            return None
403
404    # create Descriptor
405    uid = unique_id("TestSource", filename)
406    module_name = _parse_module_name(config_file, filename)
407    desc = Descriptor(uuid=uid, name=filename,
408                      source=TestSource(test_source, "", config_file,
409                                        filename, test_type, module_name, module_info))
410    if not os.path.isfile(test_source):
411        if is_config_str(test_source):
412            desc = Descriptor(uuid=uid, name=filename,
413                              source=TestSource("", test_source, config_file,
414                                                filename, test_type,
415                                                module_name, module_info))
416        else:
417            if config.testcase and not config.testlist:
418                error_message = "test case '%s' or '%s' not exists" % (
419                    "%s%s" % (test_source, PY_SUFFIX), "%s%s" % (
420                        test_source, PYD_SUFFIX))
421                error_no = "00103"
422            else:
423                error_message = "{}".format(test_source)
424                error_no = "00102"
425            desc.error = ParamError(error_message, error_no=error_no)
426
427    if Context.session().mode == ModeType.decc and error_message:
428        report_not_executed(config.report_path, [("", desc)], error_message)
429        return None
430
431    return desc
432
433
434def _get_config_file(filename, ext=None, config=None):
435    config_file = None
436    if os.path.exists("%s%s" % (filename, MODULE_CONFIG_SUFFIX)):
437        config_file = "%s%s" % (filename, MODULE_CONFIG_SUFFIX)
438        return config_file
439    if ext and os.path.exists("%s%s%s" % (filename, ext,
440                                          MODULE_CONFIG_SUFFIX)):
441        config_file = "%s%s%s" % (filename, ext, MODULE_CONFIG_SUFFIX)
442        return config_file
443    if config and getattr(config, "testcase", "") and not getattr(
444            config, "testlist"):
445        return _get_testcase_config_file(filename)
446
447    return config_file
448
449
450def _get_testcase_config_file(filename):
451    depth = 1
452    dirname = os.path.dirname(filename)
453    while dirname and depth < MAX_DIR_DEPTH:
454        for item in os.listdir(dirname):
455            item_path = os.path.join(dirname, item)
456            if os.path.isfile(item_path) and item.endswith(
457                    MODULE_CONFIG_SUFFIX):
458                return item_path
459        depth += 1
460        dirname = os.path.dirname(dirname)
461    return None
462
463
464def _get_component_info_file(entry_dir):
465    module_files = []
466    if not os.path.isdir(entry_dir):
467        return module_files
468    for item in os.listdir(entry_dir):
469        item_path = os.path.join(entry_dir, item)
470        if os.path.isfile(item_path) and item_path.endswith(
471                MODULE_INFO_SUFFIX):
472            module_files.append(item_path)
473    return module_files
474
475
476def _get_test_type(config_file, test_driver, ext):
477    if test_driver:
478        return test_driver
479
480    if config_file:
481        if not os.path.exists(config_file):
482            LOG.error("Config file '%s' not exists" % config_file,
483                      error_no="00110")
484            return ""
485        return _get_test_driver(config_file)
486    if ext in [".py", ".js", ".dex", ".hap", ".bin"] \
487            and ext in TestDictSource.exe_type.keys():
488        test_type = TestDictSource.exe_type.get(ext)
489    elif ext in [".apk"] and ext in TestDictSource.exe_type.keys():
490        test_type = DeviceTestType.hap_test
491    else:
492        test_type = DeviceTestType.cpp_test
493    return test_type
494
495
496def _parse_module_name(config_file, file_name):
497    if config_file:
498        return get_filename_extension(config_file)[0]
499    else:
500        if "{" in file_name:
501            return "report"
502        return file_name
503
504
505def _generate_config_file(device_labels, filename, ext, test_type):
506    if test_type not in [HostDrivenTestType.device_test]:
507        test_type = HostDrivenTestType.device_test
508    top_dict = {"environment": [], "driver": {"type": test_type,
509                                              "py_file": "%s%s" % (filename, ext)}}
510    for label in device_labels:
511        device_json_list = top_dict.get("environment")
512        device_json_list.append({"type": "device", "label": label})
513
514    save_file = os.path.join(os.path.dirname(filename),
515                             "%s.json" % os.path.basename(filename))
516    save_file_open = \
517        os.open(save_file, os.O_WRONLY | os.O_CREAT, FilePermission.mode_755)
518    with os.fdopen(save_file_open, "w") as save_handler:
519        save_handler.write(json.dumps(top_dict, indent=4))
520    return save_file, test_type
521
522
523def parse_source_from_data(content):
524    source_list = list()
525    data = dict(json.loads(content))
526    case_dict = dict()
527    for item in data.get("suite", list()):
528        name = item.get("module_name", "")
529        case_dict.update({name: item})
530        source_list.append(name)
531    return source_list, case_dict
532
533
534class TestDictSource:
535    exe_type = copy.deepcopy(EXT_TYPE_DICT)
536    test_type = copy.deepcopy(TEST_TYPE_DICT)
537
538    @classmethod
539    def reset(cls):
540        cls.test_type = copy.deepcopy(TEST_TYPE_DICT)
541        cls.exe_type = copy.deepcopy(EXT_TYPE_DICT)
542
543    @classmethod
544    def clear(cls):
545        cls.test_type.clear()
546        cls.exe_type.clear()
547