1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import json 21import copy 22import stat 23from collections import namedtuple 24 25from _core.constants import DeviceTestType 26from _core.constants import ModeType 27from _core.constants import HostDrivenTestType 28from _core.constants import FilePermission 29from _core.constants import ConfigConst 30from _core.exception import ParamError 31from _core.logger import platform_logger 32from _core.utils import get_filename_extension 33from _core.utils import is_config_str 34from _core.utils import unique_id 35 36__all__ = ["TestSetSource", "TestSource", "find_test_descriptors", 37 "find_testdict_descriptors", "TestDictSource"] 38 39TestSetSource = namedtuple('TestSetSource', 'set') 40TestSource = namedtuple('TestSource', 'source_file source_string config_file ' 41 'test_name test_type module_name') 42 43TEST_TYPE_DICT = {"DEX": DeviceTestType.dex_test, 44 "HAP": DeviceTestType.hap_test, 45 "APK": DeviceTestType.hap_test, 46 "PYT": HostDrivenTestType.device_test, 47 "JST": DeviceTestType.jsunit_test, 48 "OHJST": DeviceTestType.oh_jsunit_test, 49 "CXX": DeviceTestType.cpp_test, 50 "BIN": DeviceTestType.lite_cpp_test, 51 "LTPPosix": DeviceTestType.ltp_posix_test} 52EXT_TYPE_DICT = {".dex": DeviceTestType.dex_test, 53 ".hap": DeviceTestType.hap_test, 54 ".apk": DeviceTestType.hap_test, 55 ".py": HostDrivenTestType.device_test, 56 ".js": DeviceTestType.jsunit_test, 57 ".bin": DeviceTestType.lite_cpp_test, 58 "default": DeviceTestType.cpp_test} 59PY_SUFFIX = ".py" 60PYD_SUFFIX = ".pyd" 61MODULE_CONFIG_SUFFIX = ".json" 62MODULE_INFO_SUFFIX = ".moduleInfo" 63MAX_DIR_DEPTH = 6 64LOG = platform_logger("TestSource") 65 66 67def find_test_descriptors(config): 68 if not config.testfile and not config.testlist and not config.task and \ 69 not config.testcase and not config.subsystems and \ 70 not config.parts: 71 return None 72 73 # get test sources 74 testcases_dirs = _get_testcases_dirs(config) 75 test_sources = _get_test_sources(config, testcases_dirs) 76 LOG.debug("Test sources: %s", test_sources) 77 78 # normalize test sources 79 test_sources = _normalize_test_sources(testcases_dirs, test_sources, 80 config) 81 82 # make test descriptors 83 test_descriptors = _make_test_descriptors_from_testsources(test_sources, 84 config) 85 return test_descriptors 86 87 88def _get_testcases_dirs(config): 89 from xdevice import Variables 90 # add config.testcases_path and its subfolders 91 testcases_dirs = [] 92 if getattr(config, ConfigConst.testcases_path, ""): 93 testcases_dirs = [config.testcases_path] 94 _append_subfolders(config.testcases_path, testcases_dirs) 95 96 # add inner testcases dir and its subfolders 97 inner_testcases_dir = os.path.abspath(os.path.join( 98 Variables.top_dir, "testcases")) 99 if getattr(config, ConfigConst.testcases_path, "") and os.path.normcase( 100 config.testcases_path) != os.path.normcase(inner_testcases_dir): 101 testcases_dirs.append(inner_testcases_dir) 102 _append_subfolders(inner_testcases_dir, testcases_dirs) 103 104 # add execution dir and top dir 105 testcases_dirs.append(Variables.exec_dir) 106 if os.path.normcase(Variables.exec_dir) != os.path.normcase( 107 Variables.top_dir): 108 testcases_dirs.append(Variables.top_dir) 109 110 LOG.debug("Testcases directories: %s", testcases_dirs) 111 return testcases_dirs 112 113 114def _append_subfolders(testcases_path, testcases_dirs): 115 for root, dirs, _ in os.walk(testcases_path): 116 for sub_dir in dirs: 117 testcases_dirs.append(os.path.abspath(os.path.join(root, sub_dir))) 118 119 120def find_testdict_descriptors(config): 121 from xdevice import Variables 122 if getattr(config, ConfigConst.testdict, "") == "": 123 return None 124 testdict = config.testdict 125 test_descriptors = [] 126 for test_type_key, files in testdict.items(): 127 for file_name in files: 128 if not os.path.isabs(file_name): 129 file_name = os.path.join(Variables.exec_dir, file_name) 130 if os.path.isfile(file_name) and test_type_key in \ 131 TestDictSource.test_type.keys(): 132 desc = _make_test_descriptor(os.path.abspath(file_name), 133 test_type_key) 134 if desc is not None: 135 test_descriptors.append(desc) 136 if not test_descriptors: 137 raise ParamError("test source is none", error_no="00110") 138 return test_descriptors 139 140 141def _append_component_test_source(config, testcases_dir, test_sources): 142 subsystem_list = config.subsystems if config.subsystems else list() 143 part_list = config.parts if config.parts else list() 144 module_info_files = _get_component_info_file(testcases_dir) 145 result_dict = dict() 146 for info_file in module_info_files: 147 flags = os.O_RDONLY 148 modes = stat.S_IWUSR | stat.S_IRUSR 149 with os.fdopen(os.open(info_file, flags, modes), "r") as f_handler: 150 result_dict.update(json.load(f_handler)) 151 module_name = result_dict.get("module", "") 152 part_name = result_dict.get("part", "") 153 subsystem_name = result_dict.get("subsystem", "") 154 if not module_name or not part_name or not subsystem_name: 155 continue 156 module_config_file = \ 157 os.path.join(os.path.dirname(info_file), module_name) 158 is_append = True 159 if subsystem_list or part_list: 160 if part_name not in part_list and \ 161 subsystem_name not in subsystem_list: 162 is_append = False 163 if is_append: 164 getattr(config, ConfigConst.component_mapper, dict()).update( 165 {module_name: (subsystem_name, part_name)}) 166 test_sources.append(module_config_file) 167 168 169def _get_test_sources(config, testcases_dirs): 170 test_sources = [] 171 172 # get test sources from testcases_dirs 173 if not config.testfile and not config.testlist and not config.testcase \ 174 and not config.subsystems and not config.parts and not \ 175 getattr(config, ConfigConst.component_base_kit, "") and \ 176 config.task: 177 for testcases_dir in testcases_dirs: 178 _append_module_test_source(testcases_dir, test_sources) 179 return test_sources 180 181 # get test sources from config.testlist 182 if getattr(config, ConfigConst.testlist, ""): 183 for test_source in config.testlist.split(";"): 184 if test_source.strip(): 185 test_sources.append(test_source.strip()) 186 return test_sources 187 188 # get test sources from config.testfile 189 if getattr(config, ConfigConst.testfile, ""): 190 test_file = _get_test_file(config, testcases_dirs) 191 flags = os.O_RDONLY 192 modes = stat.S_IWUSR | stat.S_IRUSR 193 with os.fdopen(os.open(test_file, flags, modes), "r") as file_content: 194 if str(test_file).endswith(".json"): 195 content = file_content.read() 196 source_list, case_dict = parse_source_from_data(content) 197 test_sources.extend(source_list) 198 config.tf_suite = case_dict 199 else: 200 for line in file_content: 201 if line.strip(): 202 test_sources.append(line.strip()) 203 204 # get test sources from config.testcase 205 if getattr(config, ConfigConst.testcase, ""): 206 for test_source in config.testcase.split(";"): 207 if test_source.strip(): 208 test_sources.append(test_source.strip()) 209 return test_sources 210 211 if getattr(config, ConfigConst.subsystems, []) or \ 212 getattr(config, ConfigConst.parts, []) or \ 213 getattr(config, ConfigConst.component_base_kit, ""): 214 setattr(config, ConfigConst.component_mapper, dict()) 215 for testcases_dir in testcases_dirs: 216 _append_component_test_source(config, testcases_dir, test_sources) 217 return test_sources 218 return test_sources 219 220 221def _append_module_test_source(testcases_path, test_sources): 222 if not os.path.isdir(testcases_path): 223 return 224 for item in os.listdir(testcases_path): 225 item_path = os.path.join(testcases_path, item) 226 if os.path.isfile(item_path) and item_path.endswith( 227 MODULE_CONFIG_SUFFIX): 228 test_sources.append(item_path) 229 230 231def _get_test_file(config, testcases_dirs): 232 if os.path.isabs(config.testfile): 233 if os.path.exists(config.testfile): 234 return config.testfile 235 else: 236 raise ParamError("test file '%s' not exists" % config.testfile, 237 error_no="00110") 238 239 for testcases_dir in testcases_dirs: 240 test_file = os.path.join(testcases_dir, config.testfile) 241 if os.path.exists(test_file): 242 return test_file 243 244 raise ParamError("test file '%s' not exists" % config.testfile) 245 246 247def _normalize_test_sources(testcases_dirs, test_sources, config): 248 norm_test_sources = [] 249 for test_source in test_sources: 250 append_result = False 251 for testcases_dir in testcases_dirs: 252 # append test source absolute path 253 append_result = _append_norm_test_source( 254 norm_test_sources, test_source, testcases_dir, config) 255 if append_result: 256 break 257 258 # append test source if no corresponding file founded 259 if not append_result: 260 norm_test_sources.append(test_source) 261 if not norm_test_sources: 262 raise ParamError("test source not found") 263 return norm_test_sources 264 265 266def _append_norm_test_source(norm_test_sources, test_source, testcases_dir, 267 config): 268 # get norm_test_source 269 norm_test_source = test_source 270 if not os.path.isabs(test_source): 271 norm_test_source = os.path.abspath( 272 os.path.join(testcases_dir, test_source)) 273 274 # find py or pyd for test case input 275 if config.testcase and not config.testlist: 276 if os.path.isfile("%s%s" % (norm_test_source, PY_SUFFIX)): 277 norm_test_sources.append( 278 "%s%s" % (norm_test_source, PY_SUFFIX)) 279 return True 280 elif os.path.isfile("%s%s" % (norm_test_source, PYD_SUFFIX)): 281 norm_test_sources.append( 282 "%s%s" % (norm_test_source, PYD_SUFFIX)) 283 return True 284 return False 285 286 # append to norm_test_sources 287 if os.path.isfile(norm_test_source): 288 norm_test_sources.append(norm_test_source) 289 return True 290 elif os.path.isfile("%s%s" % (norm_test_source, MODULE_CONFIG_SUFFIX)): 291 norm_test_sources.append("%s%s" % (norm_test_source, 292 MODULE_CONFIG_SUFFIX)) 293 return True 294 return False 295 296 297def _make_test_descriptor(file_path, test_type_key): 298 from _core.executor.request import Descriptor 299 if test_type_key is None: 300 return None 301 302 # get params 303 filename, _ = get_filename_extension(file_path) 304 uid = unique_id("TestSource", filename) 305 test_type = TestDictSource.test_type[test_type_key] 306 config_file = _get_config_file( 307 os.path.join(os.path.dirname(file_path), filename)) 308 309 module_name = _parse_module_name(config_file, filename) 310 # make test descriptor 311 desc = Descriptor(uuid=uid, name=filename, 312 source=TestSource(file_path, "", config_file, filename, 313 test_type, module_name)) 314 return desc 315 316 317def _get_test_driver(test_source): 318 try: 319 from _core.testkit.json_parser import JsonParser 320 json_config = JsonParser(test_source) 321 return json_config.get_driver_type() 322 except ParamError as error: 323 LOG.error(error, error_no=error.error_no) 324 return "" 325 326 327def _make_test_descriptors_from_testsources(test_sources, config): 328 test_descriptors = [] 329 330 for test_source in test_sources: 331 filename, ext = test_source.split()[0], "str" 332 if os.path.isfile(test_source): 333 filename, ext = get_filename_extension(test_source) 334 335 test_driver = config.testdriver 336 if is_config_str(test_source): 337 test_driver = _get_test_driver(test_source) 338 339 # get params 340 config_file = _get_config_file( 341 os.path.join(os.path.dirname(test_source), filename), ext, config) 342 test_type = _get_test_type(config_file, test_driver, ext) 343 if not config_file: 344 if getattr(config, ConfigConst.testcase, "") and not \ 345 getattr(config, ConfigConst.testlist): 346 LOG.debug("Can't find the json file of config") 347 from xdevice import Scheduler 348 if Scheduler.device_labels: 349 config_file, test_type = _generate_config_file( 350 Scheduler.device_labels, 351 os.path.join(os.path.dirname(test_source), filename), 352 ext, test_type) 353 setattr(Scheduler, "tmp_json", config_file) 354 LOG.debug("Generate temp json success: %s" % config_file) 355 desc = _create_descriptor(config_file, filename, test_source, 356 test_type, config) 357 if desc: 358 test_descriptors.append(desc) 359 360 return test_descriptors 361 362 363def _create_descriptor(config_file, filename, test_source, test_type, config): 364 from xdevice import Scheduler 365 from _core.executor.request import Descriptor 366 367 error_message = "" 368 if not test_type: 369 error_message = "no driver to execute '%s'" % test_source 370 LOG.error(error_message, error_no="00112") 371 if Scheduler.mode != ModeType.decc: 372 return None 373 374 # create Descriptor 375 uid = unique_id("TestSource", filename) 376 module_name = _parse_module_name(config_file, filename) 377 desc = Descriptor(uuid=uid, name=filename, 378 source=TestSource(test_source, "", config_file, 379 filename, test_type, module_name)) 380 if not os.path.isfile(test_source): 381 if is_config_str(test_source): 382 desc = Descriptor(uuid=uid, name=filename, 383 source=TestSource("", test_source, config_file, 384 filename, test_type, 385 module_name)) 386 else: 387 if config.testcase and not config.testlist: 388 error_message = "test case '%s' or '%s' not exists" % ( 389 "%s%s" % (test_source, PY_SUFFIX), "%s%s" % ( 390 test_source, PYD_SUFFIX)) 391 error_no = "00103" 392 else: 393 error_message = "test source '%s' or '%s' not exists" % ( 394 test_source, "%s%s" % (test_source, MODULE_CONFIG_SUFFIX)) 395 error_no = "00102" 396 desc.error = ParamError(error_message, error_no=error_no) 397 398 if Scheduler.mode == ModeType.decc and error_message: 399 Scheduler.report_not_executed(config.report_path, [("", desc)], 400 error_message) 401 return None 402 403 return desc 404 405 406def _get_config_file(filename, ext=None, config=None): 407 config_file = None 408 if os.path.exists("%s%s" % (filename, MODULE_CONFIG_SUFFIX)): 409 config_file = "%s%s" % (filename, MODULE_CONFIG_SUFFIX) 410 return config_file 411 if ext and os.path.exists("%s%s%s" % (filename, ext, 412 MODULE_CONFIG_SUFFIX)): 413 config_file = "%s%s%s" % (filename, ext, MODULE_CONFIG_SUFFIX) 414 return config_file 415 if config and getattr(config, "testcase", "") and not getattr( 416 config, "testlist"): 417 return _get_testcase_config_file(filename) 418 419 return config_file 420 421 422def _get_testcase_config_file(filename): 423 depth = 1 424 dirname = os.path.dirname(filename) 425 while dirname and depth < MAX_DIR_DEPTH: 426 for item in os.listdir(dirname): 427 item_path = os.path.join(dirname, item) 428 if os.path.isfile(item_path) and item.endswith( 429 MODULE_CONFIG_SUFFIX): 430 return item_path 431 depth += 1 432 dirname = os.path.dirname(dirname) 433 return None 434 435 436def _get_component_info_file(entry_dir): 437 module_files = [] 438 if not os.path.isdir(entry_dir): 439 return module_files 440 for item in os.listdir(entry_dir): 441 item_path = os.path.join(entry_dir, item) 442 if os.path.isfile(item_path) and item_path.endswith( 443 MODULE_INFO_SUFFIX): 444 module_files.append(item_path) 445 return module_files 446 447 448def _get_test_type(config_file, test_driver, ext): 449 if test_driver: 450 return test_driver 451 452 if config_file: 453 if not os.path.exists(config_file): 454 LOG.error("Config file '%s' not exists" % config_file, 455 error_no="00110") 456 return "" 457 return _get_test_driver(config_file) 458 if ext in [".py", ".js", ".dex", ".hap", ".bin"] \ 459 and ext in TestDictSource.exe_type.keys(): 460 test_type = TestDictSource.exe_type[ext] 461 elif ext in [".apk"] and ext in TestDictSource.exe_type.keys(): 462 test_type = DeviceTestType.hap_test 463 else: 464 test_type = DeviceTestType.cpp_test 465 return test_type 466 467 468def _parse_module_name(config_file, file_name): 469 if config_file: 470 return get_filename_extension(config_file)[0] 471 else: 472 if "{" in file_name: 473 return "report" 474 return file_name 475 476 477def _generate_config_file(device_labels, filename, ext, test_type): 478 if test_type not in [HostDrivenTestType.device_test]: 479 test_type = HostDrivenTestType.device_test 480 top_dict = {"environment": [], "driver": {"type": test_type, 481 "py_file": "%s%s" % (filename, ext)}} 482 for label in device_labels: 483 device_json_list = top_dict.get("environment") 484 device_json_list.append({"type": "device", "label": label}) 485 486 save_file = os.path.join(os.path.dirname(filename), 487 "%s.json" % os.path.basename(filename)) 488 save_file_open = \ 489 os.open(save_file, os.O_WRONLY | os.O_CREAT, FilePermission.mode_755) 490 with os.fdopen(save_file_open, "w") as save_handler: 491 save_handler.write(json.dumps(top_dict, indent=4)) 492 return save_file, test_type 493 494 495def parse_source_from_data(content): 496 source_list = list() 497 data = dict(json.loads(content)) 498 case_dict = dict() 499 for item in data.get("suite", list()): 500 name = item.get("module_name", "") 501 case_dict.update({name: item}) 502 source_list.append(name) 503 return source_list, case_dict 504 505class TestDictSource: 506 exe_type = copy.deepcopy(EXT_TYPE_DICT) 507 test_type = copy.deepcopy(TEST_TYPE_DICT) 508 509 @classmethod 510 def reset(cls): 511 cls.test_type = copy.deepcopy(TEST_TYPE_DICT) 512 cls.exe_type = copy.deepcopy(EXT_TYPE_DICT) 513 514 @classmethod 515 def clear(cls): 516 cls.test_type.clear() 517 cls.exe_type.clear() 518