• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19from pydoc import classname
20import time
21import os
22import sys
23import datetime
24import xml.etree.ElementTree as ElementTree
25
26from core.constants import SchedulerType
27from xdevice import Plugin
28from xdevice import get_plugin
29from xdevice import platform_logger
30from xdevice import Scheduler
31from xdevice import DeviceTestType
32from core.utils import get_build_output_path
33from core.utils import scan_support_product
34from core.utils import is_lite_product
35from core.common import is_open_source_product
36from core.command.parameter import Parameter
37from core.command.distribute_execute import DbinderTest
38from core.testcase.testcase_manager import TestCaseManager
39from core.config.config_manager import UserConfigManager
40from core.config.parse_parts_config import ParsePartsConfig
41from core.config.resource_manager import ResourceManager
42
43LOG = platform_logger("Run")
44
45
46class Run(object):
47
48    history_cmd_list = []
49
50    @classmethod
51    def get_history(self):
52        return self.history_cmd_list
53
54    def process_command_run(self, command, options):
55        para = Parameter()
56        test_type_list = para.get_testtype_list(options.testtype)
57        if len(test_type_list) == 0:
58            LOG.error("The testtype parameter is incorrect.")
59            return
60        options.testtype = test_type_list
61
62        parser = ParsePartsConfig(options.productform)
63        partname_list = parser.get_part_list(
64            options.subsystem,
65            options.testpart)
66        options.partname_list = partname_list
67        options.coverage_outpath = self.get_coverage_outpath(options)
68
69        LOG.info("")
70        LOG.info("------------------------------------")
71        LOG.info("Input parameter:")
72        LOG.info("productform   = %s" % options.productform)
73        LOG.info("testtype      = %s" % str(options.testtype))
74        LOG.info("subsystem     = %s" % str(options.subsystem))
75        LOG.info("testpart      = %s" % str(options.testpart))
76        LOG.info("testmodule    = %s" % options.testmodule)
77        LOG.info("testsuit      = %s" % options.testsuit)
78        LOG.info("testcase      = %s" % options.testcase)
79        LOG.info("testlevel     = %s" % options.testlevel)
80        LOG.info("testargs     = %s" % options.testargs)
81        LOG.info("repeat     = %s" % options.repeat)
82        LOG.info("retry         = %s" % options.retry)
83        LOG.info("historylist   = %s" % options.historylist)
84        LOG.info("runhistory   = %s" % options.runhistory)
85        LOG.info("partname_list = %s" % str(options.partname_list))
86        LOG.info("------------------------------------")
87        LOG.info("")
88
89        if not para.check_run_parameter(options):
90            LOG.error("Input parameter is incorrect.")
91            return
92
93        current_time = datetime.datetime.now()
94        #记录命令运行历史
95        need_record_history = False
96        cmd_record = {
97            "time" : str(current_time),
98            "raw_cmd" : options.current_raw_cmd,
99            "result" : "unknown",
100            "command": command,
101            "options": options
102        }
103        if not ("-hl" in options.current_raw_cmd or "-rh" in options.current_raw_cmd \
104            or "--retry" in options.current_raw_cmd):
105            need_record_history = True
106
107        #打印历史记录
108        if options.historylist:
109            print("The latest command history is: %d" % len(self.history_cmd_list))
110            for index in range(0, len(self.history_cmd_list)):
111                cmd_record = self.history_cmd_list[index]
112                print("%d. [%s] - [%s]::[%s]" % (index + 1, cmd_record["time"],
113                      cmd_record["raw_cmd"], cmd_record["result"]))
114            return
115        #重新运行历史里的一条命令
116        if options.runhistory > 0:
117            #如果记录大于10则认为非法
118            if options.runhistory > 10 or options.runhistory > len(self.history_cmd_list):
119                print("input history command[%d] out of range:", options.runhistory)
120                return
121            cmd_record = self.history_cmd_list[options.runhistory - 1]
122            print("run history command:", cmd_record["raw_cmd"])
123            need_record_history = False
124            command = cmd_record["command"]
125            options = cmd_record["options"]
126
127        if options.retry:
128            if len(self.history_cmd_list) <= 0:
129                LOG.info("No history command exsit")
130                return
131            history_cmd = self.history_cmd_list[-1]
132            command = history_cmd["command"]
133            options = history_cmd["options"]
134            latest_report_path = os.path.join(
135                sys.framework_root_dir, "reports/latest/summary_report.xml")
136            tree = ElementTree.parse(latest_report_path)
137            root = tree.getroot()
138            has_failed_case = 0
139            test_targets = {}
140            fail_list = []
141            for child in root:
142                print(child.tag, ":", child.attrib)
143                for grand in child:
144                    print(grand.tag, ":", grand.attrib)
145                    if grand.attrib["result"] == 'false':
146                        fail_case = grand.attrib["classname"] + "#" + grand.attrib["name"]
147                        fail_list.append(fail_case)
148                        has_failed_case += 1
149            test_targets["class"] = fail_list
150            setattr(options, "testargs", test_targets)
151            print("retry option:", options)
152            if has_failed_case > 0:
153                if not self._build_test_cases(options):
154                    LOG.error("Build test cases failed.")
155                    return
156                scheduler = get_plugin(plugin_type=Plugin.SCHEDULER,
157                                    plugin_id=SchedulerType.SCHEDULER)[0]
158                scheduler.exec_command(command, options)
159            else:
160                LOG.info("No testcase to retry")
161            return
162
163        if not self._build_test_cases(options):
164            LOG.error("Build test cases failed.")
165            return
166
167        if "actstest" in options.testtype:
168            test_dict = self.get_acts_test_dict(options)
169            options.testcases_path = self.get_acts_tests_out_path(options.productform)
170            options.resource_path = self.get_acts_tests_out_path(options.productform)
171        else:
172            test_dict = self.get_test_dict(options)
173
174        if not self._check_test_dictionary(test_dict):
175            LOG.error("The test file list is empty.")
176            return
177
178        if ("distributedtest" in options.testtype and
179                len(options.testtype) == 1):
180            from core.command.distribute_utils import get_test_case
181            from core.command.distribute_utils \
182                import check_ditributetest_environment
183            from core.command.distribute_utils import make_device_info_file
184            from core.command.distribute_utils import make_reports
185
186            local_time = time.localtime()
187            create_time = time.strftime('%Y-%m-%d-%H-%M-%S', local_time)
188            start_time = time.strftime('%Y-%m-%d %H:%M:%S', local_time)
189
190            if not check_ditributetest_environment():
191                return
192
193            output_test = get_test_case(test_dict["CXX"])
194            if not output_test:
195                return
196
197            result_rootpath = os.path.join(sys.framework_root_dir,
198                "reports",
199                create_time)
200
201            log_path = os.path.join(result_rootpath, "log")
202            tmp_path = os.path.join(result_rootpath, "temp")
203            os.makedirs(log_path, exist_ok=True)
204            os.makedirs(tmp_path, exist_ok=True)
205
206            Scheduler.start_task_log(log_path)
207            make_device_info_file(tmp_path)
208
209            for case in output_test:
210                agent_target_name = case["agent_target_name"]
211                major_target_name = case["major_target_name"]
212                manager = DbinderTest(result_rootpath, case["suits_dir"])
213                manager.setUp()
214                manager.test_distribute(major_target_name, agent_target_name, options)
215                manager.tearDown()
216
217            make_reports(result_rootpath, start_time)
218            Scheduler.stop_task_logcat()
219        else:
220            options.testdict = test_dict
221            options.target_outpath = self.get_target_out_path(
222                options.productform)
223
224            scheduler = get_plugin(plugin_type=Plugin.SCHEDULER,
225                                   plugin_id=SchedulerType.SCHEDULER)[0]
226            if scheduler is None:
227                LOG.error("Can not find the scheduler plugin.")
228            else:
229                if is_lite_product(options.productform,
230                                   sys.source_code_root_path):
231                    options.testcases_path = options.target_outpath
232                    options.resource_path = os.path.abspath(os.path.join(
233                        sys.framework_root_dir, "..", "resource"))
234                    if options.productform.find("wifiiot") != -1:
235                        scheduler.update_test_type_in_source(".bin",
236                            DeviceTestType.ctest_lite)
237                        scheduler.update_ext_type_in_source("BIN",
238                            DeviceTestType.ctest_lite)
239                    else:
240                        print("productform is not wifiiot")
241                scheduler.exec_command(command, options)
242        if need_record_history:
243            #读文件获取运行结果
244            latest_report_path = os.path.join(sys.framework_root_dir, "reports/latest/summary_report.xml")
245            with open(latest_report_path) as report_file:
246                for report_line in report_file:
247                    if "testsuites name=\"summary_report\"" in report_line:
248                        result = report_line.replace("\n", "")
249                        result = result.replace("<testsuites name=\"summary_report\" ", "")
250                        result = result.replace(">", "")
251                        cmd_record["result"] = result
252                        break
253            if len(self.history_cmd_list) >= 10:
254                del self.history_cmd_list[0]
255            self.history_cmd_list.append(cmd_record)
256        print("-------------run end: ", self.history_cmd_list)
257        return
258
259    ##############################################################
260    ##############################################################
261
262    @classmethod
263    def get_target_out_path(cls, product_form):
264        target_out_path = UserConfigManager().get_test_cases_dir()
265        if target_out_path == "":
266            target_out_path = os.path.join(
267                get_build_output_path(product_form),
268                "packages",
269                product_form)
270        target_out_path = os.path.abspath(target_out_path)
271        return target_out_path
272
273    @classmethod
274    def _build_test_cases(cls, options):
275        if options.coverage:
276            LOG.info("Coverage testing, no need to compile testcases")
277            return True
278
279        is_build_testcase = UserConfigManager().get_user_config_flag(
280            "build", "testcase")
281        project_root_path = sys.source_code_root_path
282        if is_build_testcase and project_root_path != "":
283            from core.build.build_manager import BuildManager
284            build_manager = BuildManager()
285            return build_manager.build_testcases(project_root_path, options)
286        else:
287            return True
288
289    @classmethod
290    def _check_test_dictionary(cls, test_dictionary):
291        is_valid_status = False
292        key_list = sorted(test_dictionary.keys())
293        for key in key_list:
294            file_list = test_dictionary[key]
295            if len(file_list) > 0:
296                is_valid_status = True
297                break
298        return is_valid_status
299
300    @classmethod
301    def get_tests_out_path(cls, product_form):
302        testcase_path = UserConfigManager().get_test_cases_dir()
303        if testcase_path == "":
304            all_product_list = scan_support_product()
305            if product_form in all_product_list:
306                if is_open_source_product(product_form):
307                    testcase_path = os.path.abspath(os.path.join(
308                        get_build_output_path(product_form),
309                        "packages",
310                        "phone",
311                        "tests"))
312                else:
313                    testcase_path = os.path.abspath(os.path.join(
314                        get_build_output_path(product_form),
315                        "packages",
316                        product_form,
317                        "tests"))
318            else:
319                testcase_path = os.path.join(
320                    get_build_output_path(product_form), "tests")
321        LOG.info("testcase_path=%s" % testcase_path)
322        return testcase_path
323
324    @classmethod
325    def get_acts_tests_out_path(cls, product_form):
326        acts_testcase_path = os.path.abspath(os.path.join(
327            get_build_output_path(product_form),
328            "suites",
329            "acts",
330            "testcases"))
331        LOG.info("acts_testcase_path=%s" % acts_testcase_path)
332        return acts_testcase_path
333
334    @classmethod
335    def get_coverage_outpath(cls, options):
336        coverage_out_path = ""
337        if options.coverage:
338            coverage_out_path = get_build_output_path(options.productform)
339            if coverage_out_path == "":
340                coverage_out_path = UserConfigManager().get_user_config(
341                    "coverage").get("outpath", "")
342            if coverage_out_path == "":
343                LOG.error("Coverage test: coverage_outpath is empty.")
344        return coverage_out_path
345
346    def get_acts_test_dict(self, options):
347        # 获取测试用例编译结果路径
348        acts_test_case_path = self.get_acts_tests_out_path(options.productform)
349        acts_test_dict = TestCaseManager().get_acts_test_files(acts_test_case_path, options)
350        return acts_test_dict
351
352    def get_test_dict(self, options):
353        # 获取测试用例编译结果路径
354        test_case_path = self.get_tests_out_path(options.productform)
355        if not os.path.exists(test_case_path):
356            LOG.error("%s is not exist." % test_case_path)
357            return
358
359        test_dict = TestCaseManager().get_test_files(test_case_path, options)
360        return test_dict
361
362
363