1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import json 21import copy 22import stat 23from collections import namedtuple 24 25from _core.constants import DeviceTestType 26from _core.constants import ModeType 27from _core.constants import HostDrivenTestType 28from _core.constants import ConfigConst 29from _core.exception import ParamError 30from _core.logger import platform_logger 31from _core.utils import get_filename_extension 32from _core.utils import is_config_str 33from _core.utils import unique_id 34 35 36__all__ = ["TestSetSource", "TestSource", "find_test_descriptors", 37 "find_testdict_descriptors", "TestDictSource"] 38 39TestSetSource = namedtuple('TestSetSource', 'set') 40TestSource = namedtuple('TestSource', 'source_file source_string config_file ' 41 'test_name test_type module_name') 42 43TEST_TYPE_DICT = {"HAP": DeviceTestType.hap_test, 44 "PYT": HostDrivenTestType.device_test, 45 "JST": DeviceTestType.jsunit_test, 46 "CXX": DeviceTestType.cpp_test, 47 "BIN": DeviceTestType.lite_cpp_test} 48EXT_TYPE_DICT = {".hap": DeviceTestType.hap_test, 49 ".py": HostDrivenTestType.device_test, 50 ".js": DeviceTestType.jsunit_test, 51 ".bin": DeviceTestType.lite_cpp_test, 52 "default": DeviceTestType.cpp_test} 53PY_SUFFIX = ".py" 54PYD_SUFFIX = ".pyd" 55MODULE_CONFIG_SUFFIX = ".json" 56MODULE_INFO_SUFFIX = ".moduleInfo" 57MAX_DIR_DEPTH = 6 58LOG = platform_logger("TestSource") 59 60 61def find_test_descriptors(config): 62 if not config.testfile and not config.testlist and not config.task and \ 63 not config.testcase: 64 return None 65 66 # get test sources 67 testcases_dirs = _get_testcases_dirs(config) 68 test_sources = _get_test_sources(config, testcases_dirs) 69 LOG.debug("test sources: %s", test_sources) 70 71 # normalize test sources 72 test_sources = _normalize_test_sources(testcases_dirs, test_sources, 73 config) 74 75 # make test descriptors 76 test_descriptors = _make_test_descriptors_from_testsources(test_sources, 77 config) 78 return test_descriptors 79 80 81def _get_testcases_dirs(config): 82 from xdevice import Variables 83 # add config.testcases_path and its subfolders 84 testcases_dirs = [] 85 if getattr(config, ConfigConst.testcases_path, ""): 86 testcases_dirs = [config.testcases_path] 87 _append_subfolders(config.testcases_path, testcases_dirs) 88 89 # add inner testcases dir and its subfolders 90 inner_testcases_dir = os.path.abspath(os.path.join( 91 Variables.top_dir, "testcases")) 92 if getattr(config, ConfigConst.testcases_path, "") and os.path.normcase( 93 config.testcases_path) != os.path.normcase(inner_testcases_dir): 94 testcases_dirs.append(inner_testcases_dir) 95 _append_subfolders(inner_testcases_dir, testcases_dirs) 96 97 # add execution dir and top dir 98 testcases_dirs.append(Variables.exec_dir) 99 if os.path.normcase(Variables.exec_dir) != os.path.normcase( 100 Variables.top_dir): 101 testcases_dirs.append(Variables.top_dir) 102 103 LOG.debug("testcases directories: %s", testcases_dirs) 104 return testcases_dirs 105 106 107def _append_subfolders(testcases_path, testcases_dirs): 108 for root, dirs, _ in os.walk(testcases_path): 109 for sub_dir in dirs: 110 testcases_dirs.append(os.path.abspath(os.path.join(root, sub_dir))) 111 112 113def find_testdict_descriptors(config): 114 from xdevice import Variables 115 if getattr(config, ConfigConst.testdict, "") == "": 116 return None 117 testdict = config.testdict 118 test_descriptors = [] 119 for test_type_key, files in testdict.items(): 120 for file_name in files: 121 if not os.path.isabs(file_name): 122 file_name = os.path.join(Variables.exec_dir, file_name) 123 if os.path.isfile(file_name) and test_type_key in \ 124 TestDictSource.test_type.keys(): 125 desc = _make_test_descriptor(os.path.abspath(file_name), 126 test_type_key) 127 if desc is not None: 128 test_descriptors.append(desc) 129 if not test_descriptors: 130 raise ParamError("test source is none", error_no="00110") 131 return test_descriptors 132 133 134def _append_component_test_source(config, testcases_dir, test_sources): 135 subsystem_list = config.subsystems if config.subsystems else list() 136 part_list = config.parts if config.parts else list() 137 module_info_files = _get_component_info_file(testcases_dir) 138 result_dict = dict() 139 for info_file in module_info_files: 140 flags = os.O_RDONLY 141 modes = stat.S_IWUSR | stat.S_IRUSR 142 with os.fdopen(os.open(info_file, flags, modes), "r") as f_handler: 143 result_dict.update(json.load(f_handler)) 144 module_name = result_dict.get("module_name", "") 145 part_name = result_dict.get("part_name", "") 146 subsystem_name = result_dict.get("subsystem", "") 147 if not module_name or not part_name or not subsystem_name: 148 continue 149 module_config_file = \ 150 os.path.join(os.path.dirname(info_file), module_name) 151 is_append = True 152 if subsystem_list or part_list: 153 if part_name not in part_list and \ 154 subsystem_name not in subsystem_list: 155 is_append = False 156 if is_append: 157 getattr(config, ConfigConst.component_mapper, dict()).update( 158 {module_name: (subsystem_name, part_name)}) 159 test_sources.append(module_config_file) 160 161 162def _get_test_sources(config, testcases_dirs): 163 test_sources = [] 164 165 # get test sources from testcases_dirs 166 if not config.testfile and not config.testlist and not config.testcase \ 167 and not config.subsystems and not config.parts and not \ 168 getattr(config, ConfigConst.component_base_kit, "") and\ 169 config.task: 170 for testcases_dir in testcases_dirs: 171 _append_module_test_source(testcases_dir, test_sources) 172 return test_sources 173 174 # get test sources from config.testlist 175 if getattr(config, ConfigConst.testlist, ""): 176 for test_source in config.testlist.split(";"): 177 if test_source.strip(): 178 test_sources.append(test_source.strip()) 179 return test_sources 180 181 # get test sources from config.testfile 182 if getattr(config, ConfigConst.testfile, ""): 183 test_file = _get_test_file(config, testcases_dirs) 184 flags = os.O_RDONLY 185 modes = stat.S_IWUSR | stat.S_IRUSR 186 with os.fdopen(os.open(test_file, flags, modes), "r") as file_content: 187 for line in file_content: 188 if line.strip(): 189 test_sources.append(line.strip()) 190 191 # get test sources from config.testcase 192 if getattr(config, ConfigConst.testcase, ""): 193 for test_source in config.testcase.split(";"): 194 if test_source.strip(): 195 test_sources.append(test_source.strip()) 196 return test_sources 197 198 # get test sources according *.moduleInfo file 199 if getattr(config, ConfigConst.subsystems, []) or getattr( 200 config, ConfigConst.parts, []) or \ 201 getattr(config, ConfigConst.component_base_kit, ""): 202 setattr(config, ConfigConst.component_mapper, dict()) 203 for testcases_dir in testcases_dirs: 204 _append_component_test_source(config, testcases_dir, test_sources) 205 return test_sources 206 return test_sources 207 208 209def _append_module_test_source(testcases_path, test_sources): 210 if not os.path.isdir(testcases_path): 211 return 212 for item in os.listdir(testcases_path): 213 item_path = os.path.join(testcases_path, item) 214 if os.path.isfile(item_path) and item_path.endswith( 215 MODULE_CONFIG_SUFFIX): 216 test_sources.append(item_path) 217 218 219def _get_test_file(config, testcases_dirs): 220 if os.path.isabs(config.testfile): 221 if os.path.exists(config.testfile): 222 return config.testfile 223 else: 224 raise ParamError("test file '%s' not exists" % config.testfile, 225 error_no="00110") 226 227 for testcases_dir in testcases_dirs: 228 test_file = os.path.join(testcases_dir, config.testfile) 229 if os.path.exists(test_file): 230 return test_file 231 232 raise ParamError("test file '%s' not exists" % config.testfile) 233 234 235def _normalize_test_sources(testcases_dirs, test_sources, config): 236 norm_test_sources = [] 237 for test_source in test_sources: 238 append_result = False 239 for testcases_dir in testcases_dirs: 240 # append test source absolute path 241 append_result = _append_norm_test_source( 242 norm_test_sources, test_source, testcases_dir, config) 243 if append_result: 244 break 245 246 # append test source if no corresponding file founded 247 if not append_result: 248 norm_test_sources.append(test_source) 249 if not norm_test_sources: 250 raise ParamError("test source not found") 251 return norm_test_sources 252 253 254def _append_norm_test_source(norm_test_sources, test_source, testcases_dir, 255 config): 256 # get norm_test_source 257 norm_test_source = test_source 258 if not os.path.isabs(test_source): 259 norm_test_source = os.path.abspath( 260 os.path.join(testcases_dir, test_source)) 261 262 # find py or pyd for test case input 263 if config.testcase and not config.testlist: 264 if os.path.isfile("%s%s" % (norm_test_source, PY_SUFFIX)): 265 norm_test_sources.append( 266 "%s%s" % (norm_test_source, PY_SUFFIX)) 267 return True 268 elif os.path.isfile("%s%s" % (norm_test_source, PYD_SUFFIX)): 269 norm_test_sources.append( 270 "%s%s" % (norm_test_source, PYD_SUFFIX)) 271 return True 272 return False 273 274 # append to norm_test_sources 275 if os.path.isfile(norm_test_source): 276 norm_test_sources.append(norm_test_source) 277 return True 278 elif os.path.isfile("%s%s" % (norm_test_source, MODULE_CONFIG_SUFFIX)): 279 norm_test_sources.append("%s%s" % (norm_test_source, 280 MODULE_CONFIG_SUFFIX)) 281 return True 282 return False 283 284 285def _make_test_descriptor(file_path, test_type_key): 286 from _core.executor.request import Descriptor 287 if test_type_key is None: 288 return None 289 290 # get params 291 filename, _ = get_filename_extension(file_path) 292 uid = unique_id("TestSource", filename) 293 test_type = TestDictSource.test_type[test_type_key] 294 config_file = _get_config_file( 295 os.path.join(os.path.dirname(file_path), filename)) 296 297 module_name = _parse_module_name(config_file, filename) 298 # make test descriptor 299 desc = Descriptor(uuid=uid, name=filename, 300 source=TestSource(file_path, "", config_file, filename, 301 test_type, module_name)) 302 return desc 303 304 305def _get_test_driver(test_source): 306 try: 307 from _core.testkit.json_parser import JsonParser 308 json_config = JsonParser(test_source) 309 return json_config.get_driver_type() 310 except ParamError as error: 311 LOG.error(error, error_no=error.error_no) 312 return "" 313 314 315def _make_test_descriptors_from_testsources(test_sources, config): 316 test_descriptors = [] 317 318 for test_source in test_sources: 319 filename, ext = test_source.split()[0], "str" 320 if os.path.isfile(test_source): 321 filename, ext = get_filename_extension(test_source) 322 323 test_driver = config.testdriver 324 if is_config_str(test_source): 325 test_driver = _get_test_driver(test_source) 326 327 # get params 328 config_file = _get_config_file( 329 os.path.join(os.path.dirname(test_source), filename), ext, config) 330 test_type = _get_test_type(config_file, test_driver, ext) 331 332 desc = _create_descriptor(config_file, filename, test_source, 333 test_type, config) 334 if desc: 335 test_descriptors.append(desc) 336 337 return test_descriptors 338 339 340def _create_descriptor(config_file, filename, test_source, test_type, config): 341 from xdevice import Scheduler 342 from _core.executor.request import Descriptor 343 344 error_message = "" 345 if not test_type: 346 error_message = "no driver to execute '%s'" % test_source 347 LOG.error(error_message, error_no="00112") 348 if Scheduler.mode != ModeType.decc: 349 return None 350 351 # create Descriptor 352 uid = unique_id("TestSource", filename) 353 module_name = _parse_module_name(config_file, filename) 354 desc = Descriptor(uuid=uid, name=filename, 355 source=TestSource(test_source, "", config_file, 356 filename, test_type, module_name)) 357 if not os.path.isfile(test_source): 358 if is_config_str(test_source): 359 desc = Descriptor(uuid=uid, name=filename, 360 source=TestSource("", test_source, config_file, 361 filename, test_type, 362 module_name)) 363 else: 364 if config.testcase and not config.testlist: 365 error_message = "test case '%s' or '%s' not exists" % ( 366 "%s%s" % (test_source, PY_SUFFIX), "%s%s" % ( 367 test_source, PYD_SUFFIX)) 368 error_no = "00103" 369 else: 370 error_message = "test source '%s' or '%s' not exists" % ( 371 test_source, "%s%s" % (test_source, MODULE_CONFIG_SUFFIX)) 372 error_no = "00102" 373 if Scheduler.mode != ModeType.decc: 374 raise ParamError(error_message, error_no=error_no) 375 376 if Scheduler.mode == ModeType.decc and error_message: 377 Scheduler.report_not_executed(config.report_path, [("", desc)], 378 error_message) 379 return None 380 381 return desc 382 383 384def _get_config_file(filename, ext=None, config=None): 385 config_file = None 386 if os.path.exists("%s%s" % (filename, MODULE_CONFIG_SUFFIX)): 387 config_file = "%s%s" % (filename, MODULE_CONFIG_SUFFIX) 388 return config_file 389 if ext and os.path.exists("%s%s%s" % (filename, ext, 390 MODULE_CONFIG_SUFFIX)): 391 config_file = "%s%s%s" % (filename, ext, MODULE_CONFIG_SUFFIX) 392 return config_file 393 if config and getattr(config, "testcase", "") and not getattr( 394 config, "testlist"): 395 return _get_testcase_config_file(filename) 396 397 return config_file 398 399 400def _get_testcase_config_file(filename): 401 depth = 1 402 dirname = os.path.dirname(filename) 403 while dirname and depth < MAX_DIR_DEPTH: 404 for item in os.listdir(dirname): 405 item_path = os.path.join(dirname, item) 406 if os.path.isfile(item_path) and item.endswith( 407 MODULE_CONFIG_SUFFIX): 408 return item_path 409 depth += 1 410 dirname = os.path.dirname(dirname) 411 return None 412 413 414def _get_component_info_file(entry_dir): 415 module_files = [] 416 if not os.path.isdir(entry_dir): 417 return module_files 418 for item in os.listdir(entry_dir): 419 item_path = os.path.join(entry_dir, item) 420 if os.path.isfile(item_path) and item_path.endswith( 421 MODULE_INFO_SUFFIX): 422 module_files.append(item_path) 423 return module_files 424 425 426def _get_test_type(config_file, test_driver, ext): 427 if test_driver: 428 return test_driver 429 430 if config_file: 431 if not os.path.exists(config_file): 432 LOG.error("config file '%s' not exists" % config_file, 433 error_no="00110") 434 return "" 435 return _get_test_driver(config_file) 436 if ext in [".py", ".js", ".dex", ".hap", ".bin"] \ 437 and ext in TestDictSource.exe_type.keys(): 438 test_type = TestDictSource.exe_type[ext] 439 elif ext in TestDictSource.exe_type.keys(): 440 test_type = DeviceTestType.hap_test 441 else: 442 test_type = DeviceTestType.cpp_test 443 return test_type 444 445 446def _parse_module_name(config_file, file_name): 447 if config_file: 448 return get_filename_extension(config_file)[0] 449 else: 450 if "{" in file_name: 451 return "report" 452 return file_name 453 454 455class TestDictSource: 456 exe_type = dict() 457 test_type = dict() 458 459 @classmethod 460 def reset(cls): 461 cls.test_type = copy.deepcopy(TEST_TYPE_DICT) 462 cls.exe_type = copy.deepcopy(EXT_TYPE_DICT) 463