• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import platform
19import random
20import shutil
21import subprocess
22from pydoc import classname
23import time
24import os
25import sys
26import datetime
27import xml.etree.ElementTree as ElementTree
28
29from core.constants import SchedulerType
30from xdevice import Plugin
31from xdevice import get_plugin
32from xdevice import platform_logger
33from xdevice import Scheduler
34from xdevice import DeviceTestType
35from core.utils import get_build_output_path
36from core.utils import scan_support_product
37from core.utils import is_lite_product
38from core.common import is_open_source_product
39from core.command.parameter import Parameter
40from core.command.distribute_execute import DbinderTest
41from core.testcase.testcase_manager import TestCaseManager
42from core.config.config_manager import UserConfigManager
43from core.config.parse_parts_config import ParsePartsConfig
44from core.config.resource_manager import ResourceManager
45
46LOG = platform_logger("Run")
47
48
49class Run(object):
50
51    history_cmd_list = []
52
53    @classmethod
54    def get_history(self):
55        return self.history_cmd_list
56
57    @classmethod
58    def get_target_out_path(cls, product_form):
59        target_out_path = UserConfigManager().get_test_cases_dir()
60        if target_out_path == "":
61            target_out_path = os.path.join(
62                get_build_output_path(product_form),
63                "packages",
64                product_form)
65        target_out_path = os.path.abspath(target_out_path)
66        return target_out_path
67
68    @classmethod
69    def _build_test_cases(cls, options):
70        if options.coverage:
71            LOG.info("Coverage testing, no need to compile testcases")
72            return True
73
74        is_build_testcase = UserConfigManager().get_user_config_flag(
75            "build", "testcase")
76        project_root_path = sys.source_code_root_path
77        if is_build_testcase and project_root_path != "":
78            from core.build.build_manager import BuildManager
79            build_manager = BuildManager()
80            return build_manager.build_testcases(project_root_path, options)
81        else:
82            return True
83
84    @classmethod
85    def _check_test_dictionary(cls, test_dictionary):
86        is_valid_status = False
87        key_list = sorted(test_dictionary.keys())
88        for key in key_list:
89            file_list = test_dictionary[key]
90            if len(file_list) > 0:
91                is_valid_status = True
92                break
93        return is_valid_status
94
95    @classmethod
96    def get_tests_out_path(cls, product_form):
97        testcase_path = UserConfigManager().get_test_cases_dir()
98        if testcase_path == "":
99            all_product_list = scan_support_product()
100            if product_form in all_product_list:
101                if is_open_source_product(product_form):
102                    testcase_path = os.path.abspath(os.path.join(
103                        get_build_output_path(product_form),
104                        "tests"))
105                else:
106                    testcase_path = os.path.abspath(os.path.join(
107                        get_build_output_path(product_form),
108                        "tests"))
109            else:
110                testcase_path = os.path.join(
111                    get_build_output_path(product_form), "tests")
112        LOG.info("testcase_path=%s" % testcase_path)
113        return testcase_path
114
115    @classmethod
116    def get_xts_tests_out_path(cls, product_form, testtype):
117        xts_testcase_path = UserConfigManager().get_test_cases_dir()
118        if xts_testcase_path == "":
119            xts_testcase_path = os.path.abspath(os.path.join(
120                get_build_output_path(product_form),
121                "suites",
122                testtype[0],
123                "testcases"))
124        LOG.info("xts_testcase_path=%s" % xts_testcase_path)
125        return xts_testcase_path
126
127    @classmethod
128    def get_external_deps_out_path(cls, product_form):
129        external_deps_path = os.path.abspath(os.path.join(
130            get_build_output_path(product_form),
131            "part_deps_info",
132            "part_deps_info.json"))
133        LOG.info("external_deps_path=%s" % external_deps_path)
134        return external_deps_path
135
136    @classmethod
137    def get_coverage_outpath(cls, options):
138        coverage_out_path = ""
139        if options.coverage:
140            coverage_out_path = get_build_output_path(options.productform)
141            if coverage_out_path == "":
142                coverage_out_path = UserConfigManager().get_user_config(
143                    "coverage").get("outpath", "")
144            if coverage_out_path == "":
145                LOG.error("Coverage test: coverage_outpath is empty.")
146        return coverage_out_path
147
148    @classmethod
149    def get_part_deps_list(cls, productform, testpart):
150        #获取预处理部件间依赖的编译结果路径
151        external_deps_path = cls.get_external_deps_out_path(productform)
152        external_deps_path_list = TestCaseManager().get_part_deps_files(external_deps_path, testpart)
153        return external_deps_path_list
154
155    def process_command_run(self, command, options):
156        current_raw_cmd = ",".join(list(map(str, options.current_raw_cmd.split(" "))))
157        if options.coverage and platform.system() != "Windows":
158            if not options.pullgcda:
159                push_cov_path = os.path.join(sys.framework_root_dir, "local_coverage/push_coverage_so/push_coverage.py")
160                if os.path.exists(push_cov_path):
161                    if str(options.testpart) == "[]" and str(options.subsystem) == "[]":
162                        LOG.info("No subsystem or part input. Not push coverage so.")
163                    elif str(options.testpart) != "[]" and str(options.subsystem) != "[]":
164                        LOG.info("Subsystem or part, there can be only one parameter exist. Not push coverage so.")
165                    else:
166                        if str(options.testpart) != "[]":
167                            param = str(options.testpart)
168                            subprocess.run("python3 {} {} {}".format(
169                                push_cov_path, "testpart", param), shell=True)
170                        else:
171                            param = str(options.subsystem)
172                            subprocess.run("python3 {} {} {}".format(
173                                push_cov_path, "subsystem", param), shell=True)
174                else:
175                    print(f"{push_cov_path} not exists.")
176
177            init_gcov_path = os.path.join(sys.framework_root_dir, "local_coverage/resident_service/init_gcov.py")
178            if os.path.exists(init_gcov_path):
179                subprocess.run("python3 %s command_str=%s" % (
180                    init_gcov_path, current_raw_cmd), shell=True)
181            else:
182                print(f"{init_gcov_path} not exists.")
183
184        para = Parameter()
185        test_type_list = para.get_testtype_list(options.testtype)
186        if len(test_type_list) == 0:
187            LOG.error("The testtype parameter is incorrect.")
188            return
189        options.testtype = test_type_list
190
191        parser = ParsePartsConfig(options.productform)
192        partname_list = parser.get_part_list(
193            options.subsystem,
194            options.testpart)
195        options.partname_list = partname_list
196        options.coverage_outpath = self.get_coverage_outpath(options)
197
198        LOG.info("")
199        LOG.info("------------------------------------")
200        LOG.info("Input parameter:")
201        LOG.info("productform   = %s" % options.productform)
202        LOG.info("testtype      = %s" % str(options.testtype))
203        LOG.info("subsystem     = %s" % str(options.subsystem))
204        LOG.info("testpart      = %s" % str(options.testpart))
205        LOG.info("testmodule    = %s" % options.testmodule)
206        LOG.info("testsuit      = %s" % options.testsuit)
207        LOG.info("testcase      = %s" % options.testcase)
208        LOG.info("testlevel     = %s" % options.testlevel)
209        LOG.info("testargs     = %s" % options.testargs)
210        LOG.info("repeat     = %s" % options.repeat)
211        LOG.info("retry         = %s" % options.retry)
212        LOG.info("historylist   = %s" % options.historylist)
213        LOG.info("runhistory   = %s" % options.runhistory)
214        LOG.info("partname_list = %s" % str(options.partname_list))
215        LOG.info("partdeps = %s" % options.partdeps)
216        LOG.info("------------------------------------")
217        LOG.info("")
218
219        if not para.check_run_parameter(options):
220            LOG.error("Input parameter is incorrect.")
221            return
222
223        current_time = datetime.datetime.now()
224        #记录命令运行历史
225        need_record_history = False
226        cmd_record = {
227            "time" : str(current_time),
228            "raw_cmd" : options.current_raw_cmd,
229            "result" : "unknown",
230            "command": command,
231            "options": options
232        }
233        if not ("-hl" in options.current_raw_cmd or "-rh" in options.current_raw_cmd \
234            or "--retry" in options.current_raw_cmd):
235            need_record_history = True
236
237        #打印历史记录
238        if options.historylist:
239            print("The latest command history is: %d" % len(self.history_cmd_list))
240            for index, cmd_record in enumerate(self.history_cmd_list):
241                print("%d. [%s] - [%s]::[%s]" % (index + 1, cmd_record["time"],
242                      cmd_record["raw_cmd"], cmd_record["result"]))
243            return
244        #重新运行历史里的一条命令
245        if options.runhistory > 0:
246            #如果记录大于10则认为非法
247            if options.runhistory > 10 or options.runhistory > len(self.history_cmd_list):
248                print("input history command[%d] out of range:", options.runhistory)
249                return
250            cmd_record = self.history_cmd_list[options.runhistory - 1]
251            print("run history command:", cmd_record["raw_cmd"])
252            need_record_history = False
253            command = cmd_record["command"]
254            options = cmd_record["options"]
255
256        if options.retry:
257            if len(self.history_cmd_list) <= 0:
258                LOG.info("No history command exsit")
259                return
260            history_cmd = self.history_cmd_list[-1]
261            command = history_cmd["command"]
262            options = history_cmd["options"]
263            from xdevice import Variables
264            latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml")
265            tree = ElementTree.parse(latest_report_path)
266            root = tree.getroot()
267            has_failed_case = 0
268            test_targets = {}
269            fail_list = []
270            for child in root:
271                print(child.tag, ":", child.attrib)
272                for grand in child:
273                    print(grand.tag, ":", grand.attrib)
274                    for sub_child in grand:
275                        if sub_child.tag == 'failure':
276                            fail_case = grand.attrib["classname"] + "#" + grand.attrib["name"]
277                            fail_list.append(fail_case)
278                            has_failed_case += 1
279                            break
280            test_targets["class"] = fail_list
281            setattr(options, "testargs", test_targets)
282            print("retry option:", options)
283            if has_failed_case > 0:
284                if not self._build_test_cases(options):
285                    LOG.error("Build test cases failed.")
286                    return
287                scheduler = get_plugin(plugin_type=Plugin.SCHEDULER,
288                                    plugin_id=SchedulerType.SCHEDULER)[0]
289                scheduler.exec_command(command, options)
290            else:
291                LOG.info("No testcase to retry")
292            return
293
294        if not self._build_test_cases(options):
295            LOG.error("Build test cases failed.")
296            return
297
298        if "partdeps" == options.partdeps:
299            self.get_part_deps_list(options.productform, options.testpart)
300            options.testcases_path = self.get_external_deps_out_path(options.productform)
301            LOG.info("partdeps = %s" % options.partdeps)
302
303        if "acts" in options.testtype or "hats" in options.testtype or "hits" in options.testtype:
304            test_dict = self.get_xts_test_dict(options)
305            options.testcases_path = self.get_xts_tests_out_path(options.productform, options.testtype)
306            options.resource_path = self.get_xts_tests_out_path(options.productform, options.testtype)
307        else:
308            test_dict = self.get_test_dict(options)
309
310        if not self._check_test_dictionary(test_dict):
311            LOG.error("The test file list is empty.")
312            return
313        if options.coverage and platform.system() != "Windows":
314            coverage_path = os.path.join(sys.framework_root_dir, "reports/coverage")
315            if os.path.exists(coverage_path):
316                coverage_process = subprocess.Popen("rm -rf %s" % coverage_path, shell=True)
317                coverage_process.communicate()
318
319        if ("distributedtest" in options.testtype and
320                len(options.testtype) == 1):
321            from core.command.distribute_utils import get_test_case
322            from core.command.distribute_utils \
323                import check_ditributetest_environment
324            from core.command.distribute_utils import make_device_info_file
325            from core.command.distribute_utils import make_reports
326
327            local_time = time.localtime()
328            create_time = time.strftime('%Y-%m-%d-%H-%M-%S', local_time)
329            start_time = time.strftime('%Y-%m-%d %H:%M:%S', local_time)
330
331            if not check_ditributetest_environment():
332                return
333
334            output_test = get_test_case(test_dict.get("CXX", None))
335            if not output_test:
336                return
337
338            result_rootpath = os.path.join(sys.framework_root_dir,
339                "reports",
340                create_time)
341
342            log_path = os.path.join(result_rootpath, "log")
343            tmp_path = os.path.join(result_rootpath, "temp")
344            os.makedirs(log_path, exist_ok=True)
345            os.makedirs(tmp_path, exist_ok=True)
346
347            Scheduler.start_task_log(log_path)
348            make_device_info_file(tmp_path)
349
350            for case in output_test:
351                agent_target_name = case["agent_target_name"]
352                major_target_name = case["major_target_name"]
353                manager = DbinderTest(result_rootpath, case["suits_dir"])
354                manager.setUp()
355                manager.test_distribute(major_target_name, agent_target_name, options)
356                manager.tearDown()
357
358            make_reports(result_rootpath, start_time)
359            Scheduler.stop_task_logcat()
360        else:
361            options.testdict = test_dict
362            options.target_outpath = self.get_target_out_path(
363                options.productform)
364
365            scheduler = get_plugin(plugin_type=Plugin.SCHEDULER,
366                                   plugin_id=SchedulerType.SCHEDULER)[0]
367            if scheduler is None:
368                LOG.error("Can not find the scheduler plugin.")
369            else:
370                options.testcases_path = self.get_tests_out_path(options.productform)
371                options.resource_path = os.path.abspath(os.path.join(
372                    sys.framework_root_dir, "..", "resource"))
373                if is_lite_product(options.productform,
374                                   sys.source_code_root_path):
375                    if options.productform.find("wifiiot") != -1:
376                        scheduler.update_test_type_in_source(".bin",
377                            DeviceTestType.ctest_lite)
378                        scheduler.update_ext_type_in_source("BIN",
379                            DeviceTestType.ctest_lite)
380                    else:
381                        print("productform is not wifiiot")
382                scheduler.exec_command(command, options)
383        if need_record_history:
384            #读文件获取运行结果
385            from xdevice import Variables
386            latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml")
387            with open(latest_report_path) as report_file:
388                for report_line in report_file:
389                    if "testsuites name=\"summary_report\"" in report_line:
390                        result = report_line.replace("\n", "")
391                        result = result.replace("<testsuites name=\"summary_report\" ", "")
392                        result = result.replace(">", "")
393                        cmd_record["result"] = result
394                        break
395            if len(self.history_cmd_list) >= 10:
396                del self.history_cmd_list[0]
397            self.history_cmd_list.append(cmd_record)
398
399        if "fuzztest" == options.testtype[0] and options.coverage is False:
400            report = get_plugin(plugin_type=Plugin.REPORTER, plugin_id="ALL")[0]
401            latest_corpus_path = os.path.join(sys.framework_root_dir, "reports", "latest_corpus")
402            if os.path.exists(latest_corpus_path):
403                shutil.rmtree(latest_corpus_path)
404            shutil.copytree(os.path.join(report.report_path, "result"), latest_corpus_path)
405
406        if options.coverage and platform.system() != "Windows":
407            pull_service_gcov_path = os.path.join(
408                sys.framework_root_dir, "local_coverage/resident_service/pull_service_gcda.py")
409            if os.path.exists(pull_service_gcov_path):
410                subprocess.run("python3 %s command_str=%s" % (pull_service_gcov_path, current_raw_cmd), shell=True)
411            else:
412                print(f"{pull_service_gcov_path} not exists.")
413
414            if not options.pullgcda:
415                cov_main_file_path = os.path.join(sys.framework_root_dir, "local_coverage/coverage_tools.py")
416                testpart = ",".join(list(map(str, options.partname_list)))
417                if os.path.exists(cov_main_file_path):
418                    subprocess.run("python3 %s testpart=%s" % (
419                        cov_main_file_path, testpart), shell=True)
420                else:
421                    print(f"{cov_main_file_path} not exists.")
422        return
423
424    def get_xts_test_dict(self, options):
425        # 获取XTS测试用例编译结果路径
426        xts_test_case_path = self.get_xts_tests_out_path(options.productform, options.testtype)
427        if not os.path.exists(xts_test_case_path):
428            LOG.error("%s is not exist." % xts_test_case_path)
429            return {}
430        xts_test_dict = TestCaseManager().get_xts_test_files(xts_test_case_path, options)
431        return xts_test_dict
432
433    def get_test_dict(self, options):
434        # 获取测试用例编译结果路径
435        test_case_path = self.get_tests_out_path(options.productform)
436        if not os.path.exists(test_case_path):
437            LOG.error("%s is not exist." % test_case_path)
438            return {}
439
440        test_dict = TestCaseManager().get_test_files(test_case_path, options)
441        return test_dict
442