• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import copy
21import json
22import sys
23from json import JSONDecodeError
24from core.utils import get_build_output_path
25from core.common import is_open_source_product
26
27from core.utils import get_file_list_by_postfix
28from core.config.config_manager import FilterConfigManager
29from xdevice import platform_logger
30from xdevice import Scheduler
31from xdevice import DeviceTestType
32
33LOG = platform_logger("TestcaseManager")
34
35TESTFILE_TYPE_DATA_DIC = {
36    "DEX": [],
37    "HAP": [],
38    "PYT": [],
39    "CXX": [],
40    "BIN": [],
41    "OHJST": [],
42    "JST": [],
43    "LTPPosix": [],
44    "OHRust": []
45}
46FILTER_SUFFIX_NAME_LIST = [".TOC", ".info", ".pyc"]
47
48
49class TestCaseManager(object):
50    def get_test_files(self, test_case_path, options):
51        LOG.info("test case path: " + test_case_path)
52        LOG.info("test type list: " + str(options.testtype))
53        suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
54        if os.path.exists(test_case_path):
55            if len(options.testtype) != 0:
56                test_type_list = options.testtype
57                suit_file_dic = self.get_test_file_data(
58                    test_case_path,
59                    test_type_list,
60                    options)
61        else:
62            LOG.error("%s is not exist." % test_case_path)
63        return suit_file_dic
64
65    def get_test_file_data(self, test_case_path, test_type_list, options):
66        suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
67        for test_type in test_type_list:
68            temp_dic = self.get_test_file_data_by_test_type(
69                test_case_path,
70                test_type,
71                options)
72            for key, value in suit_file_dic.items():
73                suit_file_dic[key] = value + temp_dic.get(key)
74        return suit_file_dic
75
76    def get_test_file_data_by_test_type(self, test_case_path,
77                                        test_type, options):
78        suit_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
79        test_case_out_path = os.path.join(test_case_path, test_type)
80        if os.path.exists(test_case_out_path):
81            LOG.info("The test case directory: %s" % test_case_out_path)
82            return self.get_all_test_file(test_case_out_path, options)
83        else:
84            LOG.error("Test case dir does not exist. %s" % test_case_out_path)
85        return suit_file_dictionary
86
87    def get_all_test_file(self, test_case_out_path, options):
88        suite_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
89        filter_part_list = FilterConfigManager().get_filtering_list(
90            "subsystem_name", options.productform)
91        filter_list_test_file = FilterConfigManager().get_filtering_list(
92            "testfile_name", options.productform)
93        # 遍历测试用例输出目录下面的所有文件夹,每个文件夹对应一个子系统
94        for part_name in os.listdir(test_case_out_path):
95            part_case_dir = os.path.join(test_case_out_path, part_name)
96            if not os.path.isdir(part_case_dir):
97                continue
98            # 如果子系统在fiter_config.xml配置文件的<subsystem_name>下面配置过,则过滤
99            if part_name in filter_part_list:
100                continue
101
102            # 获取子系统目录下面的所有文件路径列表
103            suite_file_list = get_file_list_by_postfix(part_case_dir)
104            for suite_file in suite_file_list:
105                # 如果文件在resource目录下面,需要过滤
106                if -1 != suite_file.replace(test_case_out_path, "").find(
107                        os.sep + "resource" + os.sep):
108                    continue
109
110                file_name = os.path.basename(suite_file)
111                # 如果文件在fiter_config.xml配置文件的<testfile_name>下面配置过,则过滤
112                if file_name in filter_list_test_file:
113                    continue
114
115                prefix_name, suffix_name = os.path.splitext(file_name)
116                if suffix_name in FILTER_SUFFIX_NAME_LIST:
117                    continue
118
119                if not self.get_valid_suite_file(test_case_out_path,
120                                                suite_file,
121                                                options):
122                    continue
123
124                if suffix_name == ".dex":
125                    suite_file_dictionary.get("DEX").append(suite_file)
126                elif suffix_name == ".hap":
127                    if self.get_hap_test_driver(suite_file) == "OHJSUnitTest":
128                        # 如果stage测试指定了-tp,只有部件名与moduleInfo中part一致的HAP包才会加入最终执行的队列
129                        if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file(
130                                suite_file):
131                            continue
132                        # 如果stage测试指定了-ts,只有完全匹配的HAP包才会加入最终执行的队列
133                        if options.testsuit != "":
134                            testsuit_list = options.testsuit.split(";")
135                            is_match = False
136                            for suite_item in testsuit_list:
137                                if suite_item == prefix_name:
138                                    is_match = True
139                                    break
140                            if not is_match:
141                                continue
142                        if not self.check_hap_test_file(suite_file):
143                            continue
144                        suite_file_dictionary.get("OHJST").append(suite_file)
145                    if self.get_hap_test_driver(suite_file) == "JSUnitTest":
146                        suite_file_dictionary.get("JST").append(suite_file)
147                elif suffix_name == ".py":
148                    if not self.check_python_test_file(suite_file):
149                        continue
150                    suite_file_dictionary.get("PYT").append(suite_file)
151                elif suffix_name == "":
152                    if file_name.startswith("rust_"):
153                        Scheduler.update_test_type_in_source("OHRust", DeviceTestType.oh_rust_test)
154                        suite_file_dictionary.get("OHRust").append(suite_file)
155                    else:
156                        suite_file_dictionary.get("CXX").append(suite_file)
157                elif suffix_name == ".bin":
158                    suite_file_dictionary.get("BIN").append(suite_file)
159
160        return suite_file_dictionary
161
162    def get_part_deps_files(self, external_deps_path, testpart):
163        LOG.info("external_deps_path:" + external_deps_path)
164        if os.path.exists(external_deps_path):
165            with open(external_deps_path, 'r') as json_file:
166                data_dic = json.load(json_file)
167                if not data_dic:
168                    LOG.error("data_dic is empty.")
169                    return []
170                test_part = testpart[0]
171                if test_part in data_dic.keys():
172                    external_deps_part_list = data_dic.get(test_part)
173                    LOG.info("external_deps_part_list = %s" % external_deps_part_list)
174                    return external_deps_part_list
175                else:
176                    LOG.error("%s is not in part deps info json." % test_part)
177        else:
178            LOG.error("Part deps info %s is not exist." % external_deps_path)
179        return []
180
181    def check_xts_config_match(self, options, prefix_name, xts_suite_file):
182        # 如果xts测试指定了-tp,只有部件名与moduleInfo中part一致的文件才会加入最终执行的队列
183        if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file(xts_suite_file):
184            return False
185        # 如果xts测试指定了-ts,只有完全匹配的文件才会加入最终执行的队列
186        if options.testsuit != "":
187            testsuit_list = options.testsuit.split(";")
188            for suite_item in testsuit_list:
189                if suite_item == prefix_name:
190                    return True
191            return False
192        return True
193
194    def get_xts_test_files(self, xts_test_case_path, options):
195        LOG.info("xts test case path: " + xts_test_case_path)
196        xts_suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
197        if not os.path.exists(xts_test_case_path):
198            LOG.error("xts %s is not exist." % xts_test_case_path)
199            return xts_suit_file_dic
200        # 获取XTS测试用例输出目录下面的所有文件路径列表
201        xts_suite_file_list = get_file_list_by_postfix(xts_test_case_path)
202        for xts_suite_file in xts_suite_file_list:
203            file_name = os.path.basename(xts_suite_file)
204            prefix_name, suffix_name = os.path.splitext(file_name)
205            if not self.check_xts_config_match(options, prefix_name, xts_suite_file):
206                continue
207            if suffix_name == "":
208                if file_name == "HatsOpenPosixTest":
209                    xts_suit_file_dic.get("LTPPosix").append(xts_suite_file)
210                else:
211                    xts_suit_file_dic.get("CXX").append(xts_suite_file)
212            elif suffix_name == ".hap":
213                if self.get_hap_test_driver(xts_suite_file) == "OHJSUnitTest":
214                    xts_suit_file_dic.get("OHJST").append(xts_suite_file)
215                if self.get_hap_test_driver(xts_suite_file) == "JSUnitTest":
216                    xts_suit_file_dic.get("JST").append(xts_suite_file)
217        return xts_suit_file_dic
218
219    @classmethod
220    def get_valid_suite_file(cls, test_case_out_path, suite_file, options):
221        partlist = options.partname_list
222        testmodule = options.testmodule
223        testsuit = options.testsuit
224
225        if not suite_file.startswith(test_case_out_path):
226            return False
227
228        if testsuit != "":
229            short_name, _ = os.path.splitext(os.path.basename(suite_file))
230            testsuit_list = testsuit.split(',')
231            for test in testsuit_list:
232                if short_name.startswith(test) or \
233                        testsuit.startswith(short_name):
234                    return True
235            return False
236
237        is_valid_status = False
238        suitfile_subpath = suite_file.replace(test_case_out_path, "")
239        suitfile_subpath = suitfile_subpath.strip(os.sep)
240        if len(partlist) == 0:
241            if testmodule != "":
242                temp_list = suitfile_subpath.split(os.sep)
243                if len(temp_list) > 2 and testmodule == temp_list[1]:
244                    is_valid_status = True
245            else:
246                is_valid_status = True
247        else:
248            for partname in partlist:
249                if testmodule != "":
250                    if suitfile_subpath.startswith(
251                            partname + os.sep + testmodule + os.sep):
252                        is_valid_status = True
253                        break
254                else:
255                    if suitfile_subpath.startswith(partname + os.sep):
256                        is_valid_status = True
257                        break
258        return is_valid_status
259
260    @classmethod
261    def check_python_test_file(cls, suite_file):
262        if suite_file.endswith(".py"):
263            filename = os.path.basename(suite_file)
264            if filename.startswith("test_"):
265                return True
266        return False
267
268    @classmethod
269    def check_hap_test_file(cls, hap_file_path):
270        try:
271            if hap_file_path.endswith(".hap"):
272                json_file_path = hap_file_path.replace(".hap", ".json")
273                if os.path.exists(json_file_path):
274                    with open(json_file_path, 'r') as json_file:
275                        data_dic = json.load(json_file)
276                        if not data_dic:
277                            return False
278                        else:
279                            if "kits" in data_dic.keys():
280                                kits_list = data_dic.get("kits")
281                                if len(kits_list) > 0:
282                                    for kits_dict in kits_list:
283                                        if "test-file-name" not in kits_dict.keys():
284                                            continue
285                                        else:
286                                            return True
287                                else:
288                                    return False
289            return False
290        except JSONDecodeError:
291            return False
292        finally:
293            print(" check hap test file finally")
294
295    @classmethod
296    def get_hap_test_driver(cls, hap_file_path):
297        data_dic = cls.get_hap_json(hap_file_path)
298        if not data_dic:
299            return ""
300        else:
301            if "driver" in data_dic.keys():
302                driver_dict = data_dic.get("driver")
303                if bool(driver_dict):
304                    driver_type = driver_dict.get("type")
305                    return driver_type
306                else:
307                    LOG.error("%s has not set driver." % hap_file_path)
308                    return ""
309            else:
310                return ""
311
312    @classmethod
313    def get_hap_json(cls, hap_file_path):
314        if hap_file_path.endswith(".hap"):
315            json_file_path = hap_file_path.replace(".hap", ".json")
316            if os.path.exists(json_file_path):
317                with open(json_file_path, 'r') as json_file:
318                    data_dic = json.load(json_file)
319                    return data_dic
320            else:
321                return {}
322        else:
323            return {}
324
325    @classmethod
326    def get_hap_part_json(cls, hap_file_path):
327        if hap_file_path.endswith(".hap"):
328            json_file_path = hap_file_path.replace(".hap", ".moduleInfo")
329            if os.path.exists(json_file_path):
330                with open(json_file_path, 'r') as json_file:
331                    data_dic = json.load(json_file)
332                    return data_dic
333            else:
334                return {}
335        else:
336            return {}
337
338    @classmethod
339    def get_part_name_test_file(cls, hap_file_path):
340        data_dic = cls.get_hap_part_json(hap_file_path)
341        if not data_dic:
342            return ""
343        else:
344            if "part" in data_dic.keys():
345                part_name = data_dic["part"]
346                return part_name
347            else:
348                return ""
349