• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import platform
19import random
20import subprocess
21from pydoc import classname
22import time
23import os
24import sys
25import datetime
26import xml.etree.ElementTree as ElementTree
27
28from core.constants import SchedulerType
29from xdevice import Plugin
30from xdevice import get_plugin
31from xdevice import platform_logger
32from xdevice import Scheduler
33from xdevice import DeviceTestType
34from core.utils import get_build_output_path
35from core.utils import scan_support_product
36from core.utils import is_lite_product
37from core.common import is_open_source_product
38from core.command.parameter import Parameter
39from core.command.distribute_execute import DbinderTest
40from core.testcase.testcase_manager import TestCaseManager
41from core.config.config_manager import UserConfigManager
42from core.config.parse_parts_config import ParsePartsConfig
43from core.config.resource_manager import ResourceManager
44
45LOG = platform_logger("Run")
46
47
48class Run(object):
49
50    history_cmd_list = []
51
52    @classmethod
53    def get_history(self):
54        return self.history_cmd_list
55
56    def process_command_run(self, command, options):
57        current_raw_cmd = ",".join(list(map(str, options.current_raw_cmd.split(" "))))
58        if options.coverage and platform.system() != "Windows":
59            if not options.pullgcda:
60                push_cov_path = os.path.join(sys.framework_root_dir, "localCoverage/push_coverage_so/push_coverage.py")
61                if os.path.exists(push_cov_path):
62                    if str(options.testpart) == "[]" and str(options.subsystem) == "[]":
63                        LOG.info("No subsystem or part input. Not push coverage so.")
64                    elif str(options.testpart) != "[]" and str(options.subsystem) != "[]":
65                        LOG.info("Subsystem or part, there can be only one parameter exist. Not push coverage so.")
66                    else:
67                        if str(options.testpart) != "[]":
68                            param = str(options.testpart)
69                            subprocess.run("python3 {} {} {}".format(
70                                push_cov_path, "testpart", param), shell=True)
71                        else:
72                            param = str(options.subsystem)
73                            subprocess.run("python3 {} {} {}".format(
74                                push_cov_path, "subsystem", param), shell=True)
75                else:
76                    print(f"{push_cov_path} not exists.")
77
78            init_gcov_path = os.path.join(sys.framework_root_dir, "localCoverage/resident_service/init_gcov.py")
79            if os.path.exists(init_gcov_path):
80                subprocess.run("python3 %s command_str=%s" % (
81                    init_gcov_path, current_raw_cmd), shell=True)
82            else:
83                print(f"{init_gcov_path} not exists.")
84
85        para = Parameter()
86        test_type_list = para.get_testtype_list(options.testtype)
87        if len(test_type_list) == 0:
88            LOG.error("The testtype parameter is incorrect.")
89            return
90        options.testtype = test_type_list
91
92        parser = ParsePartsConfig(options.productform)
93        partname_list = parser.get_part_list(
94            options.subsystem,
95            options.testpart)
96        options.partname_list = partname_list
97        options.coverage_outpath = self.get_coverage_outpath(options)
98
99        LOG.info("")
100        LOG.info("------------------------------------")
101        LOG.info("Input parameter:")
102        LOG.info("productform   = %s" % options.productform)
103        LOG.info("testtype      = %s" % str(options.testtype))
104        LOG.info("subsystem     = %s" % str(options.subsystem))
105        LOG.info("testpart      = %s" % str(options.testpart))
106        LOG.info("testmodule    = %s" % options.testmodule)
107        LOG.info("testsuit      = %s" % options.testsuit)
108        LOG.info("testcase      = %s" % options.testcase)
109        LOG.info("testlevel     = %s" % options.testlevel)
110        LOG.info("testargs     = %s" % options.testargs)
111        LOG.info("repeat     = %s" % options.repeat)
112        LOG.info("retry         = %s" % options.retry)
113        LOG.info("historylist   = %s" % options.historylist)
114        LOG.info("runhistory   = %s" % options.runhistory)
115        LOG.info("partname_list = %s" % str(options.partname_list))
116        LOG.info("partdeps = %s" % options.partdeps)
117        LOG.info("------------------------------------")
118        LOG.info("")
119
120        if not para.check_run_parameter(options):
121            LOG.error("Input parameter is incorrect.")
122            return
123
124        current_time = datetime.datetime.now()
125        #记录命令运行历史
126        need_record_history = False
127        cmd_record = {
128            "time" : str(current_time),
129            "raw_cmd" : options.current_raw_cmd,
130            "result" : "unknown",
131            "command": command,
132            "options": options
133        }
134        if not ("-hl" in options.current_raw_cmd or "-rh" in options.current_raw_cmd \
135            or "--retry" in options.current_raw_cmd):
136            need_record_history = True
137
138        #打印历史记录
139        if options.historylist:
140            print("The latest command history is: %d" % len(self.history_cmd_list))
141            for index in range(0, len(self.history_cmd_list)):
142                cmd_record = self.history_cmd_list[index]
143                print("%d. [%s] - [%s]::[%s]" % (index + 1, cmd_record["time"],
144                      cmd_record["raw_cmd"], cmd_record["result"]))
145            return
146        #重新运行历史里的一条命令
147        if options.runhistory > 0:
148            #如果记录大于10则认为非法
149            if options.runhistory > 10 or options.runhistory > len(self.history_cmd_list):
150                print("input history command[%d] out of range:", options.runhistory)
151                return
152            cmd_record = self.history_cmd_list[options.runhistory - 1]
153            print("run history command:", cmd_record["raw_cmd"])
154            need_record_history = False
155            command = cmd_record["command"]
156            options = cmd_record["options"]
157
158        if options.retry:
159            if len(self.history_cmd_list) <= 0:
160                LOG.info("No history command exsit")
161                return
162            history_cmd = self.history_cmd_list[-1]
163            command = history_cmd["command"]
164            options = history_cmd["options"]
165            from xdevice import Variables
166            latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml")
167            tree = ElementTree.parse(latest_report_path)
168            root = tree.getroot()
169            has_failed_case = 0
170            test_targets = {}
171            fail_list = []
172            for child in root:
173                print(child.tag, ":", child.attrib)
174                for grand in child:
175                    print(grand.tag, ":", grand.attrib)
176                    for sub_child in grand:
177                        if sub_child.tag == 'failure':
178                            fail_case = grand.attrib["classname"] + "#" + grand.attrib["name"]
179                            fail_list.append(fail_case)
180                            has_failed_case += 1
181                            break
182            test_targets["class"] = fail_list
183            setattr(options, "testargs", test_targets)
184            print("retry option:", options)
185            if has_failed_case > 0:
186                if not self._build_test_cases(options):
187                    LOG.error("Build test cases failed.")
188                    return
189                scheduler = get_plugin(plugin_type=Plugin.SCHEDULER,
190                                    plugin_id=SchedulerType.SCHEDULER)[0]
191                scheduler.exec_command(command, options)
192            else:
193                LOG.info("No testcase to retry")
194            return
195
196        if not self._build_test_cases(options):
197            LOG.error("Build test cases failed.")
198            return
199
200        if "partdeps" == options.partdeps:
201            self.get_part_deps_list(options.productform, options.testpart)
202            options.testcases_path = self.get_external_deps_out_path(options.productform)
203            LOG.info("partdeps = %s" % options.partdeps)
204
205        if "acts" in options.testtype or "hats" in options.testtype or "hits" in options.testtype:
206            test_dict = self.get_xts_test_dict(options)
207            options.testcases_path = self.get_xts_tests_out_path(options.productform, options.testtype)
208            options.resource_path = self.get_xts_tests_out_path(options.productform, options.testtype)
209        else:
210            test_dict = self.get_test_dict(options)
211
212        if not self._check_test_dictionary(test_dict):
213            LOG.error("The test file list is empty.")
214            return
215        if options.coverage and platform.system() != "Windows":
216            coverage_path = os.path.join(sys.framework_root_dir, "reports/coverage")
217            if os.path.exists(coverage_path):
218                coverage_process = subprocess.Popen("rm -rf %s" % coverage_path, shell=True)
219                coverage_process.communicate()
220
221        if ("distributedtest" in options.testtype and
222                len(options.testtype) == 1):
223            from core.command.distribute_utils import get_test_case
224            from core.command.distribute_utils \
225                import check_ditributetest_environment
226            from core.command.distribute_utils import make_device_info_file
227            from core.command.distribute_utils import make_reports
228
229            local_time = time.localtime()
230            create_time = time.strftime('%Y-%m-%d-%H-%M-%S', local_time)
231            start_time = time.strftime('%Y-%m-%d %H:%M:%S', local_time)
232
233            if not check_ditributetest_environment():
234                return
235
236            output_test = get_test_case(test_dict["CXX"])
237            if not output_test:
238                return
239
240            result_rootpath = os.path.join(sys.framework_root_dir,
241                "reports",
242                create_time)
243
244            log_path = os.path.join(result_rootpath, "log")
245            tmp_path = os.path.join(result_rootpath, "temp")
246            os.makedirs(log_path, exist_ok=True)
247            os.makedirs(tmp_path, exist_ok=True)
248
249            Scheduler.start_task_log(log_path)
250            make_device_info_file(tmp_path)
251
252            for case in output_test:
253                agent_target_name = case["agent_target_name"]
254                major_target_name = case["major_target_name"]
255                manager = DbinderTest(result_rootpath, case["suits_dir"])
256                manager.setUp()
257                manager.test_distribute(major_target_name, agent_target_name, options)
258                manager.tearDown()
259
260            make_reports(result_rootpath, start_time)
261            Scheduler.stop_task_logcat()
262        else:
263            options.testdict = test_dict
264            options.target_outpath = self.get_target_out_path(
265                options.productform)
266
267            scheduler = get_plugin(plugin_type=Plugin.SCHEDULER,
268                                   plugin_id=SchedulerType.SCHEDULER)[0]
269            if scheduler is None:
270                LOG.error("Can not find the scheduler plugin.")
271            else:
272                options.testcases_path = self.get_tests_out_path(options.productform)
273                options.resource_path = os.path.abspath(os.path.join(
274                    sys.framework_root_dir, "..", "resource"))
275                if is_lite_product(options.productform,
276                                   sys.source_code_root_path):
277                    if options.productform.find("wifiiot") != -1:
278                        scheduler.update_test_type_in_source(".bin",
279                            DeviceTestType.ctest_lite)
280                        scheduler.update_ext_type_in_source("BIN",
281                            DeviceTestType.ctest_lite)
282                    else:
283                        print("productform is not wifiiot")
284                scheduler.exec_command(command, options)
285        if need_record_history:
286            #读文件获取运行结果
287            from xdevice import Variables
288            latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml")
289            with open(latest_report_path) as report_file:
290                for report_line in report_file:
291                    if "testsuites name=\"summary_report\"" in report_line:
292                        result = report_line.replace("\n", "")
293                        result = result.replace("<testsuites name=\"summary_report\" ", "")
294                        result = result.replace(">", "")
295                        cmd_record["result"] = result
296                        break
297            if len(self.history_cmd_list) >= 10:
298                del self.history_cmd_list[0]
299            self.history_cmd_list.append(cmd_record)
300
301        if options.coverage and platform.system() != "Windows":
302            pull_service_gcov_path = os.path.join(
303                sys.framework_root_dir, "localCoverage/resident_service/pull_service_gcda.py")
304            if os.path.exists(pull_service_gcov_path):
305                subprocess.run("python3 %s command_str=%s" % (pull_service_gcov_path, current_raw_cmd), shell=True)
306            else:
307                print(f"{pull_service_gcov_path} not exists.")
308
309            if not options.pullgcda:
310                cov_main_file_path = os.path.join(sys.framework_root_dir, "localCoverage/coverage_tools.py")
311                testpart = ",".join(list(map(str, options.partname_list)))
312                if os.path.exists(cov_main_file_path):
313                    subprocess.run("python3 %s testpart=%s" % (
314                        cov_main_file_path, testpart), shell=True)
315                else:
316                    print(f"{cov_main_file_path} not exists.")
317        return
318
319    ##############################################################
320    ##############################################################
321
322    @classmethod
323    def get_target_out_path(cls, product_form):
324        target_out_path = UserConfigManager().get_test_cases_dir()
325        if target_out_path == "":
326            target_out_path = os.path.join(
327                get_build_output_path(product_form),
328                "packages",
329                product_form)
330        target_out_path = os.path.abspath(target_out_path)
331        return target_out_path
332
333    @classmethod
334    def _build_test_cases(cls, options):
335        if options.coverage:
336            LOG.info("Coverage testing, no need to compile testcases")
337            return True
338
339        is_build_testcase = UserConfigManager().get_user_config_flag(
340            "build", "testcase")
341        project_root_path = sys.source_code_root_path
342        if is_build_testcase and project_root_path != "":
343            from core.build.build_manager import BuildManager
344            build_manager = BuildManager()
345            return build_manager.build_testcases(project_root_path, options)
346        else:
347            return True
348
349    @classmethod
350    def _check_test_dictionary(cls, test_dictionary):
351        is_valid_status = False
352        key_list = sorted(test_dictionary.keys())
353        for key in key_list:
354            file_list = test_dictionary[key]
355            if len(file_list) > 0:
356                is_valid_status = True
357                break
358        return is_valid_status
359
360    @classmethod
361    def get_tests_out_path(cls, product_form):
362        testcase_path = UserConfigManager().get_test_cases_dir()
363        if testcase_path == "":
364            all_product_list = scan_support_product()
365            if product_form in all_product_list:
366                if is_open_source_product(product_form):
367                    testcase_path = os.path.abspath(os.path.join(
368                        get_build_output_path(product_form),
369                        "tests"))
370                else:
371                    testcase_path = os.path.abspath(os.path.join(
372                        get_build_output_path(product_form),
373                        "tests"))
374            else:
375                testcase_path = os.path.join(
376                    get_build_output_path(product_form), "tests")
377        LOG.info("testcase_path=%s" % testcase_path)
378        return testcase_path
379
380    @classmethod
381    def get_xts_tests_out_path(cls, product_form, testtype):
382        xts_testcase_path = UserConfigManager().get_test_cases_dir()
383        if xts_testcase_path == "":
384            xts_testcase_path = os.path.abspath(os.path.join(
385                get_build_output_path(product_form),
386                "suites",
387                testtype[0],
388                "testcases"))
389        LOG.info("xts_testcase_path=%s" % xts_testcase_path)
390        return xts_testcase_path
391
392    @classmethod
393    def get_external_deps_out_path(cls, product_form):
394        external_deps_path = os.path.abspath(os.path.join(
395            get_build_output_path(product_form),
396            "part_deps_info",
397            "part_deps_info.json"))
398        LOG.info("external_deps_path=%s" % external_deps_path)
399        return external_deps_path
400
401    @classmethod
402    def get_coverage_outpath(cls, options):
403        coverage_out_path = ""
404        if options.coverage:
405            coverage_out_path = get_build_output_path(options.productform)
406            if coverage_out_path == "":
407                coverage_out_path = UserConfigManager().get_user_config(
408                    "coverage").get("outpath", "")
409            if coverage_out_path == "":
410                LOG.error("Coverage test: coverage_outpath is empty.")
411        return coverage_out_path
412
413    @classmethod
414    def get_part_deps_list(cls, productform, testpart):
415        #获取预处理部件间依赖的编译结果路径
416        external_deps_path = cls.get_external_deps_out_path(productform)
417        external_deps_path_list = TestCaseManager().get_part_deps_files(external_deps_path, testpart)
418        return external_deps_path_list
419
420    def get_xts_test_dict(self, options):
421        # 获取XTS测试用例编译结果路径
422        xts_test_case_path = self.get_xts_tests_out_path(options.productform, options.testtype)
423        if not os.path.exists(xts_test_case_path):
424            LOG.error("%s is not exist." % xts_test_case_path)
425            return {}
426        xts_test_dict = TestCaseManager().get_xts_test_files(xts_test_case_path, options)
427        return xts_test_dict
428
429    def get_test_dict(self, options):
430        # 获取测试用例编译结果路径
431        test_case_path = self.get_tests_out_path(options.productform)
432        if not os.path.exists(test_case_path):
433            LOG.error("%s is not exist." % test_case_path)
434            return {}
435
436        test_dict = TestCaseManager().get_test_files(test_case_path, options)
437        return test_dict
438
439
440