1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import json 21import copy 22import stat 23from collections import namedtuple 24 25from _core.constants import DeviceTestType 26from _core.constants import SchedulerType 27from _core.constants import ModeType 28from _core.constants import HostDrivenTestType 29from _core.constants import FilePermission 30from _core.constants import ConfigConst 31from _core.error import ErrorMessage 32from _core.exception import ParamError 33from _core.logger import platform_logger 34from _core.testkit.json_parser import JsonParser 35from _core.utils import get_filename_extension 36from _core.utils import is_config_str 37from _core.utils import unique_id 38 39from _core.context.handler import report_not_executed 40from _core.context.center import Context 41from _core.context.upload import Uploader 42 43__all__ = ["TestSetSource", "TestSource", "find_test_descriptors", 44 "find_testdict_descriptors", "TestDictSource"] 45 46TestSetSource = namedtuple('TestSetSource', 'set') 47TestSource = namedtuple('TestSource', 'source_file source_string config_file ' 48 'test_name test_type module_name module_subsystem') 49 50TEST_TYPE_DICT = {"DEX": DeviceTestType.dex_test, 51 "HAP": DeviceTestType.hap_test, 52 "APK": DeviceTestType.hap_test, 53 "PYT": HostDrivenTestType.device_test, 54 "JST": DeviceTestType.jsunit_test, 55 "OHJST": DeviceTestType.oh_jsunit_test, 56 "CXX": DeviceTestType.cpp_test, 57 "BIN": DeviceTestType.lite_cpp_test} 58EXT_TYPE_DICT = {".dex": DeviceTestType.dex_test, 59 ".hap": DeviceTestType.hap_test, 60 ".apk": DeviceTestType.hap_test, 61 ".py": HostDrivenTestType.device_test, 62 ".js": DeviceTestType.jsunit_test, 63 ".bin": DeviceTestType.lite_cpp_test, 64 "default": DeviceTestType.cpp_test} 65PY_SUFFIX = ".py" 66PYD_SUFFIX = ".pyd" 67MODULE_CONFIG_SUFFIX = ".json" 68MODULE_INFO_SUFFIX = ".moduleInfo" 69MAX_DIR_DEPTH = 6 70NO_MODULE_SUBSYSTEM = "zzzzzzzz" 71LOG = platform_logger("TestSource") 72 73 74def find_test_descriptors(config): 75 if not config.testfile and not config.testlist and not config.task and \ 76 not config.testcase and not config.subsystems and \ 77 not config.parts: 78 return None 79 80 # get test sources 81 testcases_dirs = _get_testcases_dirs(config) 82 test_sources = _get_test_sources(config, testcases_dirs) 83 LOG.debug("Test sources: %s", test_sources) 84 85 # normalize test sources 86 test_sources = _normalize_test_sources(testcases_dirs, test_sources, 87 config) 88 89 # make test descriptors 90 test_descriptors = _make_test_descriptors_from_testsources(test_sources, 91 config) 92 return test_descriptors 93 94 95def _get_testcases_dirs(config): 96 from xdevice import Variables 97 # add config.testcases_path and its subfolders 98 testcases_dirs = [] 99 if getattr(config, ConfigConst.testcases_path, ""): 100 testcases_dirs = [config.testcases_path] 101 _append_subfolders(config.testcases_path, testcases_dirs) 102 103 # add inner testcases dir and its subfolders 104 inner_testcases_dir = os.path.abspath(os.path.join( 105 Variables.top_dir, "testcases")) 106 if getattr(config, ConfigConst.testcases_path, "") and os.path.normcase( 107 config.testcases_path) != os.path.normcase(inner_testcases_dir): 108 testcases_dirs.append(inner_testcases_dir) 109 _append_subfolders(inner_testcases_dir, testcases_dirs) 110 111 # add execution dir and top dir 112 testcases_dirs.append(Variables.exec_dir) 113 if os.path.normcase(Variables.exec_dir) != os.path.normcase( 114 Variables.top_dir): 115 testcases_dirs.append(Variables.top_dir) 116 117 LOG.debug("Testcases directories: %s", testcases_dirs) 118 return testcases_dirs 119 120 121def _append_subfolders(testcases_path, testcases_dirs): 122 ignore_folders = ConfigConst.ignore_testcases_path.split("|") 123 for root, dirs, _ in os.walk(testcases_path): 124 dirs[:] = [d for d in dirs if d not in ignore_folders] 125 for sub_dir in dirs: 126 testcases_dirs.append(os.path.abspath(os.path.join(root, sub_dir))) 127 128 129def find_testdict_descriptors(config): 130 from xdevice import Variables 131 if getattr(config, ConfigConst.testdict, "") == "": 132 return None 133 testdict = config.testdict 134 test_descriptors = [] 135 for test_type_key, files in testdict.items(): 136 for file_name in files: 137 if not os.path.isabs(file_name): 138 file_name = os.path.join(Variables.exec_dir, file_name) 139 if os.path.isfile(file_name) and test_type_key in \ 140 TestDictSource.test_type.keys(): 141 desc = _make_test_descriptor(os.path.abspath(file_name), 142 test_type_key) 143 if desc is not None: 144 test_descriptors.append(desc) 145 if not test_descriptors: 146 raise ParamError(ErrorMessage.Common.Code_0101023) 147 return test_descriptors 148 149 150def _append_component_test_source(config, testcases_dir, test_sources): 151 subsystem_list = config.subsystems if config.subsystems else list() 152 part_list = config.parts if config.parts else list() 153 module_info_files = _get_component_info_file(testcases_dir) 154 result_dict = dict() 155 for info_file in module_info_files: 156 flags = os.O_RDONLY 157 modes = stat.S_IWUSR | stat.S_IRUSR 158 with os.fdopen(os.open(info_file, flags, modes), "r") as f_handler: 159 result_dict.update(json.load(f_handler)) 160 module_name = result_dict.get("module", "") 161 part_name = result_dict.get("part", "") 162 subsystem_name = result_dict.get("subsystem", "") 163 if not module_name or not part_name or not subsystem_name: 164 continue 165 module_config_file = \ 166 os.path.join(os.path.dirname(info_file), module_name) 167 is_append = True 168 if subsystem_list or part_list: 169 if part_name not in part_list and \ 170 subsystem_name not in subsystem_list: 171 is_append = False 172 if is_append: 173 getattr(config, ConfigConst.component_mapper, dict()).update( 174 {module_name: (subsystem_name, part_name)}) 175 test_sources.append(module_config_file) 176 177 178def _get_test_sources(config, testcases_dirs): 179 test_sources = [] 180 181 # get test sources from testcases_dirs 182 if not config.testfile and not config.testlist and not config.testcase \ 183 and not config.subsystems and not config.parts and not \ 184 getattr(config, ConfigConst.component_base_kit, "") and \ 185 config.task: 186 for testcases_dir in testcases_dirs: 187 _append_module_test_source(testcases_dir, test_sources) 188 return test_sources 189 190 # get test sources from config.testlist 191 if getattr(config, ConfigConst.testlist, ""): 192 for test_source in config.testlist.split(";"): 193 if test_source.strip(): 194 test_sources.append(test_source.strip()) 195 return test_sources 196 197 # get test sources from config.testfile 198 if getattr(config, ConfigConst.testfile, ""): 199 test_file = _get_test_file(config, testcases_dirs) 200 flags = os.O_RDONLY 201 modes = stat.S_IWUSR | stat.S_IRUSR 202 with os.fdopen(os.open(test_file, flags, modes), "r") as file_content: 203 if str(test_file).endswith(".json"): 204 content = file_content.read() 205 source_list, case_dict = parse_source_from_data(content) 206 test_sources.extend(source_list) 207 config.tf_suite = case_dict 208 else: 209 for line in file_content: 210 if line.strip(): 211 test_sources.append(line.strip()) 212 213 # get test sources from config.testcase 214 if getattr(config, ConfigConst.testcase, ""): 215 for test_source in config.testcase.split(";"): 216 if test_source.strip(): 217 test_sources.append(test_source.strip()) 218 return test_sources 219 220 if getattr(config, ConfigConst.subsystems, []) or \ 221 getattr(config, ConfigConst.parts, []) or \ 222 getattr(config, ConfigConst.component_base_kit, ""): 223 setattr(config, ConfigConst.component_mapper, dict()) 224 for testcases_dir in testcases_dirs: 225 _append_component_test_source(config, testcases_dir, test_sources) 226 return test_sources 227 return test_sources 228 229 230def _append_module_test_source(testcases_path, test_sources): 231 if not os.path.isdir(testcases_path): 232 return 233 for item in os.listdir(testcases_path): 234 if not item.endswith(MODULE_CONFIG_SUFFIX): 235 continue 236 item_path = os.path.join(testcases_path, item) 237 if os.path.isfile(item_path): 238 name = item[:-5] 239 if len(name) != len(name.strip()): 240 LOG.warning(f"test source '{item}' contains spaces at the beginning or end") 241 else: 242 test_sources.append(item_path) 243 244 245def _get_test_file(config, testcases_dirs): 246 if os.path.isabs(config.testfile): 247 if os.path.exists(config.testfile): 248 return config.testfile 249 else: 250 raise ParamError(ErrorMessage.Common.Code_0101024.format(config.testfile)) 251 252 for testcases_dir in testcases_dirs: 253 test_file = os.path.join(testcases_dir, config.testfile) 254 if os.path.exists(test_file): 255 return test_file 256 257 raise ParamError(ErrorMessage.Common.Code_0101024.format(config.testfile)) 258 259 260def _normalize_test_sources(testcases_dirs, test_sources, config): 261 norm_test_sources = [] 262 for test_source in test_sources: 263 append_result = False 264 for testcases_dir in testcases_dirs: 265 # append test source absolute path 266 append_result = _append_norm_test_source( 267 norm_test_sources, test_source, testcases_dir, config) 268 if append_result: 269 break 270 271 # append test source if no corresponding file founded 272 if not append_result: 273 norm_test_sources.append(test_source) 274 if not norm_test_sources: 275 raise ParamError(ErrorMessage.Common.Code_0101023) 276 return norm_test_sources 277 278 279def _append_norm_test_source(norm_test_sources, test_source, testcases_dir, 280 config): 281 # get norm_test_source 282 norm_test_source = test_source 283 if not os.path.isabs(test_source): 284 norm_test_source = os.path.abspath( 285 os.path.join(testcases_dir, test_source)) 286 287 # find py or pyd for test case input 288 if config.testcase and not config.testlist: 289 if os.path.isfile("%s%s" % (norm_test_source, PY_SUFFIX)): 290 norm_test_sources.append( 291 "%s%s" % (norm_test_source, PY_SUFFIX)) 292 return True 293 elif os.path.isfile("%s%s" % (norm_test_source, PYD_SUFFIX)): 294 norm_test_sources.append( 295 "%s%s" % (norm_test_source, PYD_SUFFIX)) 296 return True 297 return False 298 299 # append to norm_test_sources 300 if os.path.isfile(norm_test_source): 301 norm_test_sources.append(norm_test_source) 302 return True 303 elif os.path.isfile("%s%s" % (norm_test_source, MODULE_CONFIG_SUFFIX)): 304 norm_test_sources.append("%s%s" % (norm_test_source, 305 MODULE_CONFIG_SUFFIX)) 306 return True 307 return False 308 309 310def _make_test_descriptor(file_path, test_type_key): 311 from _core.executor.request import Descriptor 312 if test_type_key is None: 313 return None 314 315 # get params 316 filename, _ = get_filename_extension(file_path) 317 uid = unique_id("TestSource", filename) 318 test_type = TestDictSource.test_type.get(test_type_key) 319 config_file = _get_config_file( 320 os.path.join(os.path.dirname(file_path), filename)) 321 322 module_info = "" 323 if config_file: 324 module_info = _get_module_info(config_file) 325 326 module_name = _parse_module_name(config_file, filename) 327 # make test descriptor 328 desc = Descriptor(uuid=uid, name=filename, 329 source=TestSource(file_path, "", config_file, filename, 330 test_type, module_name, module_info)) 331 return desc 332 333 334def _get_test_driver(test_source): 335 try: 336 json_config = JsonParser(test_source) 337 return json_config.get_driver_type() 338 except ParamError as error: 339 LOG.error(error, error_no=error.error_no) 340 return "" 341 342 343def _make_test_descriptors_from_testsources(test_sources, config): 344 test_descriptors = [] 345 346 for test_source in test_sources: 347 filename, ext = test_source.split()[0], "str" 348 if os.path.isfile(test_source): 349 filename, ext = get_filename_extension(test_source) 350 351 test_driver = config.testdriver 352 if is_config_str(test_source): 353 test_driver = _get_test_driver(test_source) 354 355 # get params 356 config_file = _get_config_file( 357 os.path.join(os.path.dirname(test_source), filename), ext, config) 358 test_type = _get_test_type(config_file, test_driver, ext) 359 if not config_file: 360 if getattr(config, ConfigConst.testcase, "") and not \ 361 getattr(config, ConfigConst.testlist): 362 LOG.debug("Can't find the json file of config") 363 if Context.session().device_labels: 364 config_file, test_type = _generate_config_file( 365 Context.session().device_labels, 366 os.path.join(os.path.dirname(test_source), filename), 367 ext, test_type) 368 setattr(Uploader, "tmp_json", config_file) 369 LOG.debug("Generate temp json success: %s" % config_file) 370 module_info = NO_MODULE_SUBSYSTEM 371 if config.scheduler == SchedulerType.module and config_file: 372 module_info = _get_module_info(config_file) 373 desc = _create_descriptor(config_file, filename, test_source, 374 test_type, config, module_info) 375 if desc: 376 test_descriptors.append(desc) 377 378 return test_descriptors 379 380 381def _get_module_info(config_file): 382 module_info_path = config_file.replace(MODULE_CONFIG_SUFFIX, MODULE_INFO_SUFFIX) 383 if not os.path.exists(module_info_path): 384 return NO_MODULE_SUBSYSTEM 385 try: 386 from _core.testkit.json_parser import JsonParser 387 json_config = JsonParser(module_info_path) 388 return json_config.get_module_subsystem() 389 except ParamError as error: 390 LOG.warning(error, error_no=error.error_no) 391 return NO_MODULE_SUBSYSTEM 392 393 394def _create_descriptor(config_file, filename, test_source, test_type, config, module_info): 395 from _core.executor.request import Descriptor 396 397 error_message = "" 398 if not test_type: 399 error_message = "no driver to execute '%s'" % test_source 400 LOG.error(error_message, error_no="00112") 401 if Context.session().mode != ModeType.decc: 402 return None 403 404 # create Descriptor 405 uid = unique_id("TestSource", filename) 406 module_name = _parse_module_name(config_file, filename) 407 desc = Descriptor(uuid=uid, name=filename, 408 source=TestSource(test_source, "", config_file, 409 filename, test_type, module_name, module_info)) 410 if not os.path.isfile(test_source): 411 if is_config_str(test_source): 412 desc = Descriptor(uuid=uid, name=filename, 413 source=TestSource("", test_source, config_file, 414 filename, test_type, 415 module_name, module_info)) 416 else: 417 if config.testcase and not config.testlist: 418 error_message = "test case '%s' or '%s' not exists" % ( 419 "%s%s" % (test_source, PY_SUFFIX), "%s%s" % ( 420 test_source, PYD_SUFFIX)) 421 error_no = "00103" 422 else: 423 error_message = "{}".format(test_source) 424 error_no = "00102" 425 desc.error = ParamError(error_message, error_no=error_no) 426 427 if Context.session().mode == ModeType.decc and error_message: 428 report_not_executed(config.report_path, [("", desc)], error_message) 429 return None 430 431 return desc 432 433 434def _get_config_file(filename, ext=None, config=None): 435 config_file = None 436 if os.path.exists("%s%s" % (filename, MODULE_CONFIG_SUFFIX)): 437 config_file = "%s%s" % (filename, MODULE_CONFIG_SUFFIX) 438 return config_file 439 if ext and os.path.exists("%s%s%s" % (filename, ext, 440 MODULE_CONFIG_SUFFIX)): 441 config_file = "%s%s%s" % (filename, ext, MODULE_CONFIG_SUFFIX) 442 return config_file 443 if config and getattr(config, "testcase", "") and not getattr( 444 config, "testlist"): 445 return _get_testcase_config_file(filename) 446 447 return config_file 448 449 450def _get_testcase_config_file(filename): 451 depth = 1 452 dirname = os.path.dirname(filename) 453 while dirname and depth < MAX_DIR_DEPTH: 454 for item in os.listdir(dirname): 455 item_path = os.path.join(dirname, item) 456 if os.path.isfile(item_path) and item.endswith( 457 MODULE_CONFIG_SUFFIX): 458 return item_path 459 depth += 1 460 dirname = os.path.dirname(dirname) 461 return None 462 463 464def _get_component_info_file(entry_dir): 465 module_files = [] 466 if not os.path.isdir(entry_dir): 467 return module_files 468 for item in os.listdir(entry_dir): 469 item_path = os.path.join(entry_dir, item) 470 if os.path.isfile(item_path) and item_path.endswith( 471 MODULE_INFO_SUFFIX): 472 module_files.append(item_path) 473 return module_files 474 475 476def _get_test_type(config_file, test_driver, ext): 477 if test_driver: 478 return test_driver 479 480 if config_file: 481 if not os.path.exists(config_file): 482 LOG.error("Config file '%s' not exists" % config_file, 483 error_no="00110") 484 return "" 485 return _get_test_driver(config_file) 486 if ext in [".py", ".js", ".dex", ".hap", ".bin"] \ 487 and ext in TestDictSource.exe_type.keys(): 488 test_type = TestDictSource.exe_type.get(ext) 489 elif ext in [".apk"] and ext in TestDictSource.exe_type.keys(): 490 test_type = DeviceTestType.hap_test 491 else: 492 test_type = DeviceTestType.cpp_test 493 return test_type 494 495 496def _parse_module_name(config_file, file_name): 497 if config_file: 498 return get_filename_extension(config_file)[0] 499 else: 500 if "{" in file_name: 501 return "report" 502 return file_name 503 504 505def _generate_config_file(device_labels, filename, ext, test_type): 506 if test_type not in [HostDrivenTestType.device_test]: 507 test_type = HostDrivenTestType.device_test 508 top_dict = {"environment": [], "driver": {"type": test_type, 509 "py_file": "%s%s" % (filename, ext)}} 510 for label in device_labels: 511 device_json_list = top_dict.get("environment") 512 device_json_list.append({"type": "device", "label": label}) 513 514 save_file = os.path.join(os.path.dirname(filename), 515 "%s.json" % os.path.basename(filename)) 516 save_file_open = \ 517 os.open(save_file, os.O_WRONLY | os.O_CREAT, FilePermission.mode_755) 518 with os.fdopen(save_file_open, "w") as save_handler: 519 save_handler.write(json.dumps(top_dict, indent=4)) 520 return save_file, test_type 521 522 523def parse_source_from_data(content): 524 source_list = list() 525 data = dict(json.loads(content)) 526 case_dict = dict() 527 for item in data.get("suite", list()): 528 name = item.get("module_name", "") 529 case_dict.update({name: item}) 530 source_list.append(name) 531 return source_list, case_dict 532 533 534class TestDictSource: 535 exe_type = copy.deepcopy(EXT_TYPE_DICT) 536 test_type = copy.deepcopy(TEST_TYPE_DICT) 537 538 @classmethod 539 def reset(cls): 540 cls.test_type = copy.deepcopy(TEST_TYPE_DICT) 541 cls.exe_type = copy.deepcopy(EXT_TYPE_DICT) 542 543 @classmethod 544 def clear(cls): 545 cls.test_type.clear() 546 cls.exe_type.clear() 547