• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import copy
21import json
22import sys
23from json import JSONDecodeError
24from core.utils import get_build_output_path
25from core.common import is_open_source_product
26
27from core.utils import get_file_list_by_postfix
28from core.config.config_manager import FilterConfigManager
29from xdevice import platform_logger
30from xdevice import DeviceTestType
31from xdevice import Binder
32
33LOG = platform_logger("TestcaseManager")
34
35TESTFILE_TYPE_DATA_DIC = {
36    "DEX": [],
37    "HAP": [],
38    "PYT": [],
39    "CXX": [],
40    "BIN": [],
41    "OHJST": [],
42    "JST": [],
43    "LTPPosix": [],
44    "OHRust": []
45}
46FILTER_SUFFIX_NAME_LIST = [".TOC", ".info", ".pyc"]
47
48
49class TestCaseManager(object):
50    @classmethod
51    def get_valid_suite_file(cls, test_case_out_path, suite_file, options):
52        partlist = options.partname_list
53        testmodule = options.testmodule
54        testsuit = options.testsuit
55
56        if not suite_file.startswith(test_case_out_path):
57            return False
58
59        if testsuit != "":
60            short_name, _ = os.path.splitext(os.path.basename(suite_file))
61            testsuit_list = testsuit.split(',')
62            for test in testsuit_list:
63                if short_name.startswith(test) or \
64                        testsuit.startswith(short_name):
65                    return True
66            return False
67
68        is_valid_status = False
69        suitfile_subpath = suite_file.replace(test_case_out_path, "")
70        suitfile_subpath = suitfile_subpath.strip(os.sep)
71        if len(partlist) == 0:
72            if testmodule != "":
73                temp_list = suitfile_subpath.split(os.sep)
74                if len(temp_list) > 2 and testmodule == temp_list[1]:
75                    is_valid_status = True
76            else:
77                is_valid_status = True
78        else:
79            for partname in partlist:
80                if testmodule != "":
81                    module_list = testmodule.split(",")
82                    for module in module_list:
83                        if suitfile_subpath.startswith(
84                            partname + os.sep + module + os.sep):
85                            is_valid_status = True
86                            break
87                else:
88                    if suitfile_subpath.startswith(partname + os.sep):
89                        is_valid_status = True
90                        break
91        return is_valid_status
92
93    @classmethod
94    def check_python_test_file(cls, suite_file):
95        if suite_file.endswith(".py"):
96            filename = os.path.basename(suite_file)
97            if filename.startswith("test_"):
98                return True
99        return False
100
101    @classmethod
102    def check_hap_test_file(cls, hap_file_path):
103        try:
104            if hap_file_path.endswith(".hap"):
105                json_file_path = hap_file_path.replace(".hap", ".json")
106                if os.path.exists(json_file_path):
107                    with open(json_file_path, 'r') as json_file:
108                        data_dic = json.load(json_file)
109                        if not data_dic:
110                            return False
111                        else:
112                            if "kits" in data_dic.keys():
113                                kits_list = data_dic.get("kits")
114                                if len(kits_list) > 0:
115                                    for kits_dict in kits_list:
116                                        if "test-file-name" not in kits_dict.keys():
117                                            continue
118                                        else:
119                                            return True
120                                else:
121                                    return False
122            return False
123        except JSONDecodeError:
124            return False
125        finally:
126            print(" check hap test file finally")
127
128    @classmethod
129    def get_hap_test_driver(cls, hap_file_path):
130        data_dic = cls.get_hap_json(hap_file_path)
131        if not data_dic:
132            return ""
133        else:
134            if "driver" in data_dic.keys():
135                driver_dict = data_dic.get("driver")
136                if bool(driver_dict):
137                    driver_type = driver_dict.get("type")
138                    return driver_type
139                else:
140                    LOG.error("%s has not set driver." % hap_file_path)
141                    return ""
142            else:
143                return ""
144
145    @classmethod
146    def get_hap_json(cls, hap_file_path):
147        if hap_file_path.endswith(".hap"):
148            json_file_path = hap_file_path.replace(".hap", ".json")
149            if os.path.exists(json_file_path):
150                with open(json_file_path, 'r') as json_file:
151                    data_dic = json.load(json_file)
152                    return data_dic
153            else:
154                return {}
155        else:
156            return {}
157
158    @classmethod
159    def get_hap_part_json(cls, hap_file_path):
160        if hap_file_path.endswith(".hap"):
161            json_file_path = hap_file_path.replace(".hap", ".moduleInfo")
162            if os.path.exists(json_file_path):
163                with open(json_file_path, 'r') as json_file:
164                    data_dic = json.load(json_file)
165                    return data_dic
166            else:
167                return {}
168        else:
169            return {}
170
171    @classmethod
172    def get_part_name_test_file(cls, hap_file_path):
173        data_dic = cls.get_hap_part_json(hap_file_path)
174        if not data_dic:
175            return ""
176        else:
177            if "part" in data_dic.keys():
178                part_name = data_dic["part"]
179                return part_name
180            else:
181                return ""
182
183    def get_test_files(self, test_case_path, options):
184        LOG.info("test case path: " + test_case_path)
185        LOG.info("test type list: " + str(options.testtype))
186        suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
187        if os.path.exists(test_case_path):
188            if len(options.testtype) != 0:
189                test_type_list = options.testtype
190                suit_file_dic = self.get_test_file_data(
191                    test_case_path,
192                    test_type_list,
193                    options)
194        else:
195            LOG.error("%s is not exist." % test_case_path)
196        return suit_file_dic
197
198    def get_test_file_data(self, test_case_path, test_type_list, options):
199        suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
200        for test_type in test_type_list:
201            temp_dic = self.get_test_file_data_by_test_type(
202                test_case_path,
203                test_type,
204                options)
205            for key, value in suit_file_dic.items():
206                suit_file_dic[key] = value + temp_dic.get(key)
207        return suit_file_dic
208
209    def get_test_file_data_by_test_type(self, test_case_path,
210                                        test_type, options):
211        suit_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
212        test_case_out_path = os.path.join(test_case_path, test_type)
213        if os.path.exists(test_case_out_path):
214            LOG.info("The test case directory: %s" % test_case_out_path)
215            return self.get_all_test_file(test_case_out_path, options)
216        else:
217            LOG.error("Test case dir does not exist. %s" % test_case_out_path)
218        return suit_file_dictionary
219
220    def get_all_test_file(self, test_case_out_path, options):
221        suite_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
222        filter_part_list = FilterConfigManager().get_filtering_list(
223            "subsystem_name", options.productform)
224        filter_list_test_file = FilterConfigManager().get_filtering_list(
225            "testfile_name", options.productform)
226        # 遍历测试用例输出目录下面的所有文件夹,每个文件夹对应一个子系统
227        command_list = options.current_raw_cmd.split(" ")
228        for part_name in os.listdir(test_case_out_path):
229            if "-ss" in command_list or "-tp" in command_list:
230                if part_name not in options.partname_list:
231                    continue
232            part_case_dir = os.path.join(test_case_out_path, part_name)
233            if not os.path.isdir(part_case_dir):
234                continue
235            # 如果子系统在fiter_config.xml配置文件的<subsystem_name>下面配置过,则过滤
236            if part_name in filter_part_list:
237                continue
238
239            # 获取子系统目录下面的所有文件路径列表
240            suite_file_list = get_file_list_by_postfix(part_case_dir)
241            for suite_file in suite_file_list:
242                # 如果文件在resource目录下面,需要过滤
243                if -1 != suite_file.replace(test_case_out_path, "").find(
244                        os.sep + "resource" + os.sep):
245                    continue
246
247                file_name = os.path.basename(suite_file)
248                # 如果文件在fiter_config.xml配置文件的<testfile_name>下面配置过,则过滤
249                if file_name in filter_list_test_file:
250                    continue
251
252                prefix_name, suffix_name = os.path.splitext(file_name)
253                if suffix_name in FILTER_SUFFIX_NAME_LIST:
254                    continue
255
256                if not self.get_valid_suite_file(test_case_out_path,
257                                                suite_file,
258                                                options):
259                    continue
260
261                if suffix_name == ".dex":
262                    suite_file_dictionary.get("DEX").append(suite_file)
263                elif suffix_name == ".hap":
264                    if self.get_hap_test_driver(suite_file) == "OHJSUnitTest":
265                        # 如果stage测试指定了-tp,只有部件名与moduleInfo中part一致的HAP包才会加入最终执行的队列
266                        if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file(
267                                suite_file):
268                            continue
269                        # 如果stage测试指定了-ts,只有完全匹配的HAP包才会加入最终执行的队列
270                        if options.testsuit != "":
271                            testsuit_list = options.testsuit.split(";")
272                            is_match = False
273                            for suite_item in testsuit_list:
274                                if suite_item == prefix_name:
275                                    is_match = True
276                                    break
277                            if not is_match:
278                                continue
279                        if not self.check_hap_test_file(suite_file):
280                            continue
281                        suite_file_dictionary.get("OHJST").append(suite_file)
282                    if self.get_hap_test_driver(suite_file) == "JSUnitTest":
283                        suite_file_dictionary.get("JST").append(suite_file)
284                elif suffix_name == ".py":
285                    if not self.check_python_test_file(suite_file):
286                        continue
287                    suite_file_dictionary.get("PYT").append(suite_file)
288                elif suffix_name == "":
289                    if file_name.startswith("rust_"):
290                        Binder.get_tdd_config().update_test_type_in_source(
291                            "OHRust", DeviceTestType.oh_rust_test)
292                        suite_file_dictionary.get("OHRust").append(suite_file)
293                    else:
294                        suite_file_dictionary.get("CXX").append(suite_file)
295                elif suffix_name == ".bin":
296                    suite_file_dictionary.get("BIN").append(suite_file)
297
298        return suite_file_dictionary
299
300    def get_part_deps_files(self, external_deps_path, testpart):
301        LOG.info("external_deps_path:" + external_deps_path)
302        if os.path.exists(external_deps_path):
303            with open(external_deps_path, 'r') as json_file:
304                data_dic = json.load(json_file)
305                if not data_dic:
306                    LOG.error("data_dic is empty.")
307                    return []
308                test_part = testpart[0]
309                if test_part in data_dic.keys():
310                    external_deps_part_list = data_dic.get(test_part)
311                    LOG.info("external_deps_part_list = %s" % external_deps_part_list)
312                    return external_deps_part_list
313                else:
314                    LOG.error("%s is not in part deps info json." % test_part)
315        else:
316            LOG.error("Part deps info %s is not exist." % external_deps_path)
317        return []
318
319    def check_xts_config_match(self, options, prefix_name, xts_suite_file):
320        # 如果xts测试指定了-tp,只有部件名与moduleInfo中part一致的文件才会加入最终执行的队列
321        if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file(xts_suite_file):
322            return False
323        # 如果xts测试指定了-ts,只有完全匹配的文件才会加入最终执行的队列
324        if options.testsuit != "":
325            testsuit_list = options.testsuit.split(";")
326            for suite_item in testsuit_list:
327                if suite_item == prefix_name:
328                    return True
329            return False
330        return True
331
332    def get_xts_test_files(self, xts_test_case_path, options):
333        LOG.info("xts test case path: " + xts_test_case_path)
334        xts_suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
335        if not os.path.exists(xts_test_case_path):
336            LOG.error("xts %s is not exist." % xts_test_case_path)
337            return xts_suit_file_dic
338        # 获取XTS测试用例输出目录下面的所有文件路径列表
339        xts_suite_file_list = get_file_list_by_postfix(xts_test_case_path)
340        for xts_suite_file in xts_suite_file_list:
341            file_name = os.path.basename(xts_suite_file)
342            prefix_name, suffix_name = os.path.splitext(file_name)
343            if not self.check_xts_config_match(options, prefix_name, xts_suite_file):
344                continue
345            if suffix_name == "":
346                if file_name == "HatsOpenPosixTest":
347                    xts_suit_file_dic.get("LTPPosix").append(xts_suite_file)
348                else:
349                    xts_suit_file_dic.get("CXX").append(xts_suite_file)
350            elif suffix_name == ".hap":
351                if self.get_hap_test_driver(xts_suite_file) == "OHJSUnitTest":
352                    xts_suit_file_dic.get("OHJST").append(xts_suite_file)
353                if self.get_hap_test_driver(xts_suite_file) == "JSUnitTest":
354                    xts_suit_file_dic.get("JST").append(xts_suite_file)
355        return xts_suit_file_dic
356