• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import time
20import os
21import sys
22
23from core.constants import SchedulerType
24from xdevice import Plugin
25from xdevice import get_plugin
26from xdevice import platform_logger
27from xdevice import Scheduler
28from xdevice import DeviceTestType
29from core.utils import get_build_output_path
30from core.utils import scan_support_product
31from core.utils import is_lite_product
32from core.common import is_open_source_product
33from core.command.parameter import Parameter
34from core.testcase.testcase_manager import TestCaseManager
35from core.config.config_manager import UserConfigManager
36from core.config.parse_parts_config import ParsePartsConfig
37
38LOG = platform_logger("Run")
39
40
41class Run(object):
42    def process_command_run(self, command, options):
43        para = Parameter()
44        test_type_list = para.get_testtype_list(options.testtype)
45        if len(test_type_list) == 0:
46            LOG.error("The testtype parameter is incorrect.")
47            return
48        options.testtype = test_type_list
49
50        parser = ParsePartsConfig(options.productform)
51        partname_list = parser.get_part_list(
52            options.subsystem,
53            options.testpart)
54        options.partname_list = partname_list
55        options.coverage_outpath = self.get_coverage_outpath(options)
56
57        LOG.info("")
58        LOG.info("------------------------------------")
59        LOG.info("Input parameter:")
60        LOG.info("productform   = %s" % options.productform)
61        LOG.info("testtype      = %s" % str(options.testtype))
62        LOG.info("subsystem     = %s" % str(options.subsystem))
63        LOG.info("testpart      = %s" % str(options.testpart))
64        LOG.info("testmodule    = %s" % options.testmodule)
65        LOG.info("testsuit      = %s" % options.testsuit)
66        LOG.info("testcase      = %s" % options.testcase)
67        LOG.info("testlevel     = %s" % options.testlevel)
68        LOG.info("partname_list = %s" % str(options.partname_list))
69        LOG.info("------------------------------------")
70        LOG.info("")
71
72        if not para.check_run_parameter(options):
73            LOG.error("Input parameter is incorrect.")
74            return
75
76        if not self._build_test_cases(options):
77            LOG.error("Build test cases failed.")
78            return
79
80        test_case_path = self.get_tests_out_path(options.productform)
81        if not os.path.exists(test_case_path):
82            LOG.error("%s is not exist." % test_case_path)
83            return
84
85        test_dict = TestCaseManager().get_test_files(test_case_path, options)
86        if not self._check_test_dictionary(test_dict):
87            LOG.error("The test file list is empty.")
88            return
89
90        if ("distributedtest" in options.testtype and
91                len(options.testtype) == 1):
92            from core.command.distribute_utils \
93                import check_ditributetest_environment
94            from core.command.distribute_utils import make_device_info_file
95            from core.command.distribute_utils \
96                import execute_distribute_test_file
97            from core.command.distribute_utils import make_reports
98
99            local_time = time.localtime()
100            create_time = time.strftime('%Y-%m-%d-%H-%M-%S', local_time)
101            start_time = time.strftime('%Y-%m-%d %H:%M:%S', local_time)
102
103            if not check_ditributetest_environment():
104                return
105
106            result_rootpath = os.path.join(sys.framework_root_dir,
107                "reports",
108                create_time)
109            print(result_rootpath)
110
111            log_path = os.path.join(result_rootpath, "log")
112            tmp_path = os.path.join(result_rootpath, "temp")
113            os.makedirs(log_path, exist_ok=True)
114            os.makedirs(tmp_path, exist_ok=True)
115
116            Scheduler.start_task_log(log_path)
117            make_device_info_file(tmp_path)
118
119            pyfile_list = test_dict["PYT"]
120            for index, element in enumerate(pyfile_list):
121                LOG.info("[%s / %s] Executing: %s" % (index + 1,
122                    len(pyfile_list), element))
123                execute_distribute_test_file(element, result_rootpath)
124
125            make_reports(result_rootpath, start_time)
126            Scheduler.stop_task_log()
127        else:
128            options.testdict = test_dict
129            options.target_outpath = self.get_target_out_path(
130                options.productform)
131
132            scheduler = get_plugin(plugin_type=Plugin.SCHEDULER,
133                                   plugin_id=SchedulerType.SCHEDULER)[0]
134            if scheduler is None:
135                LOG.error("Can not find the scheduler plugin.")
136            else:
137                if is_lite_product(options.productform,
138                                   sys.source_code_root_path):
139                    options.testcases_path = options.target_outpath
140                    options.resource_path = os.path.abspath(os.path.join(
141                        sys.framework_root_dir, "..", "resource"))
142                    print(options.testcases_path)
143                    print(options.resource_path)
144                    if options.productform.find("wifiiot") != -1:
145                        scheduler.update_test_type_in_source(".bin",
146                            DeviceTestType.ctest_lite)
147                        scheduler.update_ext_type_in_source("BIN",
148                            DeviceTestType.ctest_lite)
149                    else:
150                        print("productform is not wifiiot")
151                scheduler.exec_command(command, options)
152        return
153
154    ##############################################################
155    ##############################################################
156
157    @classmethod
158    def get_target_out_path(cls, product_form):
159        target_out_path = UserConfigManager().get_test_cases_dir()
160        if target_out_path == "":
161            target_out_path = os.path.join(
162                get_build_output_path(product_form),
163                "packages",
164                product_form)
165        target_out_path = os.path.abspath(target_out_path)
166        return target_out_path
167
168    @classmethod
169    def _build_test_cases(cls, options):
170        if options.coverage:
171            LOG.info("Coverage testing, no need to compile testcases")
172            return True
173
174        is_build_testcase = UserConfigManager().get_user_config_flag(
175            "build", "testcase")
176        project_root_path = sys.source_code_root_path
177        if is_build_testcase and project_root_path != "":
178            from core.build.build_manager import BuildManager
179            build_manager = BuildManager()
180            return build_manager.build_testcases(project_root_path, options)
181        else:
182            return True
183
184    @classmethod
185    def _check_test_dictionary(cls, test_dictionary):
186        is_valid_status = False
187        key_list = sorted(test_dictionary.keys())
188        for key in key_list:
189            file_list = test_dictionary[key]
190            if len(file_list) > 0:
191                is_valid_status = True
192                break
193        return is_valid_status
194
195    @classmethod
196    def get_tests_out_path(cls, product_form):
197        testcase_path = UserConfigManager().get_test_cases_dir()
198        if testcase_path == "":
199            all_product_list = scan_support_product()
200            if product_form in all_product_list:
201                if is_open_source_product(product_form):
202                    testcase_path = os.path.abspath(os.path.join(
203                        get_build_output_path(product_form),
204                        "packages",
205                        "phone",
206                        "tests"))
207                else:
208                    testcase_path = os.path.abspath(os.path.join(
209                        get_build_output_path(product_form),
210                        "packages",
211                        product_form,
212                        "tests"))
213            else:
214                testcase_path = os.path.join(
215                    get_build_output_path(product_form), "test")
216        LOG.info("testcase_path=%s" % testcase_path)
217        return testcase_path
218
219    @classmethod
220    def get_coverage_outpath(cls, options):
221        coverage_out_path = ""
222        if options.coverage:
223            coverage_out_path = get_build_output_path(options.productform)
224            if coverage_out_path == "":
225                coverage_out_path = UserConfigManager().get_user_config(
226                    "coverage").get("outpath", "")
227            if coverage_out_path == "":
228                LOG.error("Coverage test: coverage_outpath is empty.")
229        return coverage_out_path
230
231
232