• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import copy
21import json
22import sys
23from json import JSONDecodeError
24from core.utils import get_build_output_path
25from core.common import is_open_source_product
26
27from core.utils import get_file_list_by_postfix
28from core.config.config_manager import FilterConfigManager
29from xdevice import platform_logger
30from xdevice import DeviceTestType
31from xdevice import Binder
32
33LOG = platform_logger("TestcaseManager")
34
35TESTFILE_TYPE_DATA_DIC = {
36    "DEX": [],
37    "HAP": [],
38    "PYT": [],
39    "CXX": [],
40    "BIN": [],
41    "OHJST": [],
42    "JST": [],
43    "LTPPosix": [],
44    "OHRust": []
45}
46FILTER_SUFFIX_NAME_LIST = [".TOC", ".info", ".pyc"]
47
48
49class TestCaseManager(object):
50    @classmethod
51    def get_valid_suite_file(cls, test_case_out_path, suite_file, options):
52        partlist = options.partname_list
53        testmodule = options.testmodule
54        testsuit = options.testsuit
55
56        if not suite_file.startswith(test_case_out_path):
57            return False
58
59        if testsuit != "":
60            short_name, _ = os.path.splitext(os.path.basename(suite_file))
61            testsuit_list = testsuit.split(',')
62            for test in testsuit_list:
63                if short_name == test:
64                    return True
65            return False
66
67        is_valid_status = False
68        suitfile_subpath = suite_file.replace(test_case_out_path, "")
69        suitfile_subpath = suitfile_subpath.strip(os.sep)
70        if len(partlist) == 0:
71            if testmodule != "":
72                is_valid_status = False
73            else:
74                is_valid_status = True
75        else:
76            for partname in partlist:
77                if testmodule != "":
78                    module_list = testmodule.split(",")
79                    for module in module_list:
80                        if suitfile_subpath.startswith(
81                            partname + os.sep + module + os.sep):
82                            is_valid_status = True
83                            break
84                else:
85                    if suitfile_subpath.startswith(partname + os.sep):
86                        is_valid_status = True
87                        break
88        return is_valid_status
89
90    @classmethod
91    def check_python_test_file(cls, suite_file):
92        if suite_file.endswith(".py"):
93            filename = os.path.basename(suite_file)
94            if filename.startswith("test_"):
95                return True
96        return False
97
98    @classmethod
99    def check_hap_test_file(cls, hap_file_path):
100        try:
101            if hap_file_path.endswith(".hap"):
102                json_file_path = hap_file_path.replace(".hap", ".json")
103                if os.path.exists(json_file_path):
104                    with open(json_file_path, 'r') as json_file:
105                        data_dic = json.load(json_file)
106                        if not data_dic:
107                            return False
108                        else:
109                            if "kits" in data_dic.keys():
110                                kits_list = data_dic.get("kits")
111                                if len(kits_list) > 0:
112                                    for kits_dict in kits_list:
113                                        if "test-file-name" not in kits_dict.keys():
114                                            continue
115                                        else:
116                                            return True
117                                else:
118                                    return False
119            return False
120        except JSONDecodeError:
121            return False
122        finally:
123            print(" check hap test file finally")
124
125    @classmethod
126    def get_hap_test_driver(cls, hap_file_path):
127        data_dic = cls.get_hap_json(hap_file_path)
128        if not data_dic:
129            return ""
130        else:
131            if "driver" in data_dic.keys():
132                driver_dict = data_dic.get("driver")
133                if bool(driver_dict):
134                    driver_type = driver_dict.get("type")
135                    return driver_type
136                else:
137                    LOG.error("%s has not set driver." % hap_file_path)
138                    return ""
139            else:
140                return ""
141
142    @classmethod
143    def get_hap_json(cls, hap_file_path):
144        if hap_file_path.endswith(".hap"):
145            json_file_path = hap_file_path.replace(".hap", ".json")
146            if os.path.exists(json_file_path):
147                with open(json_file_path, 'r') as json_file:
148                    data_dic = json.load(json_file)
149                    return data_dic
150            else:
151                return {}
152        else:
153            return {}
154
155    @classmethod
156    def get_hap_part_json(cls, hap_file_path):
157        if hap_file_path.endswith(".hap"):
158            json_file_path = hap_file_path.replace(".hap", ".moduleInfo")
159            if os.path.exists(json_file_path):
160                with open(json_file_path, 'r') as json_file:
161                    data_dic = json.load(json_file)
162                    return data_dic
163            else:
164                return {}
165        else:
166            return {}
167
168    @classmethod
169    def get_part_name_test_file(cls, hap_file_path):
170        data_dic = cls.get_hap_part_json(hap_file_path)
171        if not data_dic:
172            return ""
173        else:
174            if "part" in data_dic.keys():
175                part_name = data_dic["part"]
176                return part_name
177            else:
178                return ""
179
180    def get_test_files(self, test_case_path, options):
181        LOG.info("test case path: " + test_case_path)
182        LOG.info("test type list: " + str(options.testtype))
183        suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
184        if os.path.exists(test_case_path):
185            if len(options.testtype) != 0:
186                test_type_list = options.testtype
187                suit_file_dic = self.get_test_file_data(
188                    test_case_path,
189                    test_type_list,
190                    options)
191        else:
192            LOG.error("%s is not exist." % test_case_path)
193        return suit_file_dic
194
195    def get_test_file_data(self, test_case_path, test_type_list, options):
196        suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
197        for test_type in test_type_list:
198            temp_dic = self.get_test_file_data_by_test_type(
199                test_case_path,
200                test_type,
201                options)
202            for key, value in suit_file_dic.items():
203                suit_file_dic[key] = value + temp_dic.get(key)
204        return suit_file_dic
205
206    def get_test_file_data_by_test_type(self, test_case_path,
207                                        test_type, options):
208        suit_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
209        test_case_out_path = os.path.join(test_case_path, test_type)
210        if os.path.exists(test_case_out_path):
211            LOG.info("The test case directory: %s" % test_case_out_path)
212            return self.get_all_test_file(test_case_out_path, options)
213        else:
214            LOG.error("Test case dir does not exist. %s" % test_case_out_path)
215        return suit_file_dictionary
216
217    def get_all_test_file(self, test_case_out_path, options):
218        suite_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
219        filter_part_list = FilterConfigManager().get_filtering_list(
220            "subsystem_name", options.productform)
221        filter_list_test_file = FilterConfigManager().get_filtering_list(
222            "testfile_name", options.productform)
223        # 遍历测试用例输出目录下面的所有文件夹,每个文件夹对应一个子系统
224        command_list = options.current_raw_cmd.split(" ")
225        for part_name in os.listdir(test_case_out_path):
226            if "-ss" in command_list or "-tp" in command_list:
227                if part_name not in options.partname_list:
228                    continue
229            part_case_dir = os.path.join(test_case_out_path, part_name)
230            if not os.path.isdir(part_case_dir):
231                continue
232            # 如果子系统在fiter_config.xml配置文件的<subsystem_name>下面配置过,则过滤
233            if part_name in filter_part_list:
234                continue
235
236            # 获取子系统目录下面的所有文件路径列表
237            suite_file_list = get_file_list_by_postfix(part_case_dir)
238            for suite_file in suite_file_list:
239                # 如果文件在resource目录下面,需要过滤
240                if -1 != suite_file.replace(test_case_out_path, "").find(
241                        os.sep + "resource" + os.sep):
242                    continue
243
244                file_name = os.path.basename(suite_file)
245                # 如果文件在fiter_config.xml配置文件的<testfile_name>下面配置过,则过滤
246                if file_name in filter_list_test_file:
247                    continue
248
249                prefix_name, suffix_name = os.path.splitext(file_name)
250                if suffix_name in FILTER_SUFFIX_NAME_LIST:
251                    continue
252
253                if not self.get_valid_suite_file(test_case_out_path,
254                                                suite_file,
255                                                options):
256                    continue
257
258                if suffix_name == ".dex":
259                    suite_file_dictionary.get("DEX").append(suite_file)
260                elif suffix_name == ".hap":
261                    if self.get_hap_test_driver(suite_file) == "OHJSUnitTest":
262                        # 如果stage测试指定了-tp,只有部件名与moduleInfo中part一致的HAP包才会加入最终执行的队列
263                        if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file(
264                                suite_file):
265                            continue
266                        # 如果stage测试指定了-ts,只有完全匹配的HAP包才会加入最终执行的队列
267                        if options.testsuit != "":
268                            testsuit_list = options.testsuit.split(";")
269                            is_match = False
270                            for suite_item in testsuit_list:
271                                if suite_item == prefix_name:
272                                    is_match = True
273                                    break
274                            if not is_match:
275                                continue
276                        if not self.check_hap_test_file(suite_file):
277                            continue
278                        suite_file_dictionary.get("OHJST").append(suite_file)
279                    if self.get_hap_test_driver(suite_file) == "JSUnitTest":
280                        suite_file_dictionary.get("JST").append(suite_file)
281                elif suffix_name == ".py":
282                    if not self.check_python_test_file(suite_file):
283                        continue
284                    suite_file_dictionary.get("PYT").append(suite_file)
285                elif suffix_name == "":
286                    if file_name.startswith("rust_"):
287                        Binder.get_tdd_config().update_test_type_in_source(
288                            "OHRust", DeviceTestType.oh_rust_test)
289                        suite_file_dictionary.get("OHRust").append(suite_file)
290                    else:
291                        suite_file_dictionary.get("CXX").append(suite_file)
292                elif suffix_name == ".bin":
293                    suite_file_dictionary.get("BIN").append(suite_file)
294
295        return suite_file_dictionary
296
297    def get_part_deps_files(self, external_deps_path, testpart):
298        LOG.info("external_deps_path:" + external_deps_path)
299        if os.path.exists(external_deps_path):
300            with open(external_deps_path, 'r') as json_file:
301                data_dic = json.load(json_file)
302                if not data_dic:
303                    LOG.error("data_dic is empty.")
304                    return []
305                test_part = testpart[0]
306                if test_part in data_dic.keys():
307                    external_deps_part_list = data_dic.get(test_part)
308                    LOG.info("external_deps_part_list = %s" % external_deps_part_list)
309                    return external_deps_part_list
310                else:
311                    LOG.error("%s is not in part deps info json." % test_part)
312        else:
313            LOG.error("Part deps info %s is not exist." % external_deps_path)
314        return []
315
316    def check_xts_config_match(self, options, prefix_name, xts_suite_file):
317        # 如果xts测试指定了-tp,只有部件名与moduleInfo中part一致的文件才会加入最终执行的队列
318        if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file(xts_suite_file):
319            return False
320        # 如果xts测试指定了-ts,只有完全匹配的文件才会加入最终执行的队列
321        if options.testsuit != "":
322            testsuit_list = options.testsuit.split(";")
323            for suite_item in testsuit_list:
324                if suite_item == prefix_name:
325                    return True
326            return False
327        return True
328
329    def get_xts_test_files(self, xts_test_case_path, options):
330        LOG.info("xts test case path: " + xts_test_case_path)
331        xts_suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
332        if not os.path.exists(xts_test_case_path):
333            LOG.error("xts %s is not exist." % xts_test_case_path)
334            return xts_suit_file_dic
335        # 获取XTS测试用例输出目录下面的所有文件路径列表
336        xts_suite_file_list = get_file_list_by_postfix(xts_test_case_path)
337        for xts_suite_file in xts_suite_file_list:
338            file_name = os.path.basename(xts_suite_file)
339            prefix_name, suffix_name = os.path.splitext(file_name)
340            if not self.check_xts_config_match(options, prefix_name, xts_suite_file):
341                continue
342            if suffix_name == "":
343                if file_name == "HatsOpenPosixTest":
344                    xts_suit_file_dic.get("LTPPosix").append(xts_suite_file)
345                else:
346                    xts_suit_file_dic.get("CXX").append(xts_suite_file)
347            elif suffix_name == ".hap":
348                if self.get_hap_test_driver(xts_suite_file) == "OHJSUnitTest":
349                    xts_suit_file_dic.get("OHJST").append(xts_suite_file)
350                if self.get_hap_test_driver(xts_suite_file) == "JSUnitTest":
351                    xts_suit_file_dic.get("JST").append(xts_suite_file)
352        return xts_suit_file_dic
353