1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import json 21import copy 22import stat 23from collections import namedtuple 24 25from _core.constants import DeviceTestType 26from _core.constants import ModeType 27from _core.constants import HostDrivenTestType 28from _core.constants import FilePermission 29from _core.constants import ConfigConst 30from _core.exception import ParamError 31from _core.logger import platform_logger 32from _core.utils import get_filename_extension 33from _core.utils import is_config_str 34from _core.utils import unique_id 35 36__all__ = ["TestSetSource", "TestSource", "find_test_descriptors", 37 "find_testdict_descriptors", "TestDictSource"] 38 39TestSetSource = namedtuple('TestSetSource', 'set') 40TestSource = namedtuple('TestSource', 'source_file source_string config_file ' 41 'test_name test_type module_name') 42 43TEST_TYPE_DICT = {"DEX": DeviceTestType.dex_test, 44 "HAP": DeviceTestType.hap_test, 45 "APK": DeviceTestType.hap_test, 46 "PYT": HostDrivenTestType.device_test, 47 "JST": DeviceTestType.jsunit_test, 48 "OHJST": DeviceTestType.oh_jsunit_test, 49 "CXX": DeviceTestType.cpp_test, 50 "BIN": DeviceTestType.lite_cpp_test} 51EXT_TYPE_DICT = {".dex": DeviceTestType.dex_test, 52 ".hap": DeviceTestType.hap_test, 53 ".apk": DeviceTestType.hap_test, 54 ".py": HostDrivenTestType.device_test, 55 ".js": DeviceTestType.jsunit_test, 56 ".bin": DeviceTestType.lite_cpp_test, 57 "default": DeviceTestType.cpp_test} 58PY_SUFFIX = ".py" 59PYD_SUFFIX = ".pyd" 60MODULE_CONFIG_SUFFIX = ".json" 61MODULE_INFO_SUFFIX = ".moduleInfo" 62MAX_DIR_DEPTH = 6 63LOG = platform_logger("TestSource") 64 65 66def find_test_descriptors(config): 67 if not config.testfile and not config.testlist and not config.task and \ 68 not config.testcase and not config.subsystems and \ 69 not config.parts: 70 return None 71 72 # get test sources 73 testcases_dirs = _get_testcases_dirs(config) 74 test_sources = _get_test_sources(config, testcases_dirs) 75 LOG.debug("Test sources: %s", test_sources) 76 77 # normalize test sources 78 test_sources = _normalize_test_sources(testcases_dirs, test_sources, 79 config) 80 81 # make test descriptors 82 test_descriptors = _make_test_descriptors_from_testsources(test_sources, 83 config) 84 return test_descriptors 85 86 87def _get_testcases_dirs(config): 88 from xdevice import Variables 89 # add config.testcases_path and its subfolders 90 testcases_dirs = [] 91 if getattr(config, ConfigConst.testcases_path, ""): 92 testcases_dirs = [config.testcases_path] 93 _append_subfolders(config.testcases_path, testcases_dirs) 94 95 # add inner testcases dir and its subfolders 96 inner_testcases_dir = os.path.abspath(os.path.join( 97 Variables.top_dir, "testcases")) 98 if getattr(config, ConfigConst.testcases_path, "") and os.path.normcase( 99 config.testcases_path) != os.path.normcase(inner_testcases_dir): 100 testcases_dirs.append(inner_testcases_dir) 101 _append_subfolders(inner_testcases_dir, testcases_dirs) 102 103 # add execution dir and top dir 104 testcases_dirs.append(Variables.exec_dir) 105 if os.path.normcase(Variables.exec_dir) != os.path.normcase( 106 Variables.top_dir): 107 testcases_dirs.append(Variables.top_dir) 108 109 LOG.debug("Testcases directories: %s", testcases_dirs) 110 return testcases_dirs 111 112 113def _append_subfolders(testcases_path, testcases_dirs): 114 for root, dirs, _ in os.walk(testcases_path): 115 for sub_dir in dirs: 116 testcases_dirs.append(os.path.abspath(os.path.join(root, sub_dir))) 117 118 119def find_testdict_descriptors(config): 120 from xdevice import Variables 121 if getattr(config, ConfigConst.testdict, "") == "": 122 return None 123 testdict = config.testdict 124 test_descriptors = [] 125 for test_type_key, files in testdict.items(): 126 for file_name in files: 127 if not os.path.isabs(file_name): 128 file_name = os.path.join(Variables.exec_dir, file_name) 129 if os.path.isfile(file_name) and test_type_key in \ 130 TestDictSource.test_type.keys(): 131 desc = _make_test_descriptor(os.path.abspath(file_name), 132 test_type_key) 133 if desc is not None: 134 test_descriptors.append(desc) 135 if not test_descriptors: 136 raise ParamError("test source is none", error_no="00110") 137 return test_descriptors 138 139 140def _append_component_test_source(config, testcases_dir, test_sources): 141 subsystem_list = config.subsystems if config.subsystems else list() 142 part_list = config.parts if config.parts else list() 143 module_info_files = _get_component_info_file(testcases_dir) 144 result_dict = dict() 145 for info_file in module_info_files: 146 flags = os.O_RDONLY 147 modes = stat.S_IWUSR | stat.S_IRUSR 148 with os.fdopen(os.open(info_file, flags, modes), "r") as f_handler: 149 result_dict.update(json.load(f_handler)) 150 module_name = result_dict.get("module", "") 151 part_name = result_dict.get("part", "") 152 subsystem_name = result_dict.get("subsystem", "") 153 if not module_name or not part_name or not subsystem_name: 154 continue 155 module_config_file = \ 156 os.path.join(os.path.dirname(info_file), module_name) 157 is_append = True 158 if subsystem_list or part_list: 159 if part_name not in part_list and \ 160 subsystem_name not in subsystem_list: 161 is_append = False 162 if is_append: 163 getattr(config, ConfigConst.component_mapper, dict()).update( 164 {module_name: (subsystem_name, part_name)}) 165 test_sources.append(module_config_file) 166 167 168def _get_test_sources(config, testcases_dirs): 169 test_sources = [] 170 171 # get test sources from testcases_dirs 172 if not config.testfile and not config.testlist and not config.testcase \ 173 and not config.subsystems and not config.parts and not \ 174 getattr(config, ConfigConst.component_base_kit, "") and \ 175 config.task: 176 for testcases_dir in testcases_dirs: 177 _append_module_test_source(testcases_dir, test_sources) 178 return test_sources 179 180 # get test sources from config.testlist 181 if getattr(config, ConfigConst.testlist, ""): 182 for test_source in config.testlist.split(";"): 183 if test_source.strip(): 184 test_sources.append(test_source.strip()) 185 return test_sources 186 187 # get test sources from config.testfile 188 if getattr(config, ConfigConst.testfile, ""): 189 test_file = _get_test_file(config, testcases_dirs) 190 flags = os.O_RDONLY 191 modes = stat.S_IWUSR | stat.S_IRUSR 192 with os.fdopen(os.open(test_file, flags, modes), "r") as file_content: 193 for line in file_content: 194 if line.strip(): 195 test_sources.append(line.strip()) 196 197 # get test sources from config.testcase 198 if getattr(config, ConfigConst.testcase, ""): 199 for test_source in config.testcase.split(";"): 200 if test_source.strip(): 201 test_sources.append(test_source.strip()) 202 return test_sources 203 204 if getattr(config, ConfigConst.subsystems, []) or \ 205 getattr(config, ConfigConst.parts, []) or \ 206 getattr(config, ConfigConst.component_base_kit, ""): 207 setattr(config, ConfigConst.component_mapper, dict()) 208 for testcases_dir in testcases_dirs: 209 _append_component_test_source(config, testcases_dir, test_sources) 210 return test_sources 211 return test_sources 212 213 214def _append_module_test_source(testcases_path, test_sources): 215 if not os.path.isdir(testcases_path): 216 return 217 for item in os.listdir(testcases_path): 218 item_path = os.path.join(testcases_path, item) 219 if os.path.isfile(item_path) and item_path.endswith( 220 MODULE_CONFIG_SUFFIX): 221 test_sources.append(item_path) 222 223 224def _get_test_file(config, testcases_dirs): 225 if os.path.isabs(config.testfile): 226 if os.path.exists(config.testfile): 227 return config.testfile 228 else: 229 raise ParamError("test file '%s' not exists" % config.testfile, 230 error_no="00110") 231 232 for testcases_dir in testcases_dirs: 233 test_file = os.path.join(testcases_dir, config.testfile) 234 if os.path.exists(test_file): 235 return test_file 236 237 raise ParamError("test file '%s' not exists" % config.testfile) 238 239 240def _normalize_test_sources(testcases_dirs, test_sources, config): 241 norm_test_sources = [] 242 for test_source in test_sources: 243 append_result = False 244 for testcases_dir in testcases_dirs: 245 # append test source absolute path 246 append_result = _append_norm_test_source( 247 norm_test_sources, test_source, testcases_dir, config) 248 if append_result: 249 break 250 251 # append test source if no corresponding file founded 252 if not append_result: 253 norm_test_sources.append(test_source) 254 if not norm_test_sources: 255 raise ParamError("test source not found") 256 return norm_test_sources 257 258 259def _append_norm_test_source(norm_test_sources, test_source, testcases_dir, 260 config): 261 # get norm_test_source 262 norm_test_source = test_source 263 if not os.path.isabs(test_source): 264 norm_test_source = os.path.abspath( 265 os.path.join(testcases_dir, test_source)) 266 267 # find py or pyd for test case input 268 if config.testcase and not config.testlist: 269 if os.path.isfile("%s%s" % (norm_test_source, PY_SUFFIX)): 270 norm_test_sources.append( 271 "%s%s" % (norm_test_source, PY_SUFFIX)) 272 return True 273 elif os.path.isfile("%s%s" % (norm_test_source, PYD_SUFFIX)): 274 norm_test_sources.append( 275 "%s%s" % (norm_test_source, PYD_SUFFIX)) 276 return True 277 return False 278 279 # append to norm_test_sources 280 if os.path.isfile(norm_test_source): 281 norm_test_sources.append(norm_test_source) 282 return True 283 elif os.path.isfile("%s%s" % (norm_test_source, MODULE_CONFIG_SUFFIX)): 284 norm_test_sources.append("%s%s" % (norm_test_source, 285 MODULE_CONFIG_SUFFIX)) 286 return True 287 return False 288 289 290def _make_test_descriptor(file_path, test_type_key): 291 from _core.executor.request import Descriptor 292 if test_type_key is None: 293 return None 294 295 # get params 296 filename, _ = get_filename_extension(file_path) 297 uid = unique_id("TestSource", filename) 298 test_type = TestDictSource.test_type[test_type_key] 299 config_file = _get_config_file( 300 os.path.join(os.path.dirname(file_path), filename)) 301 302 module_name = _parse_module_name(config_file, filename) 303 # make test descriptor 304 desc = Descriptor(uuid=uid, name=filename, 305 source=TestSource(file_path, "", config_file, filename, 306 test_type, module_name)) 307 return desc 308 309 310def _get_test_driver(test_source): 311 try: 312 from _core.testkit.json_parser import JsonParser 313 json_config = JsonParser(test_source) 314 return json_config.get_driver_type() 315 except ParamError as error: 316 LOG.error(error, error_no=error.error_no) 317 return "" 318 319 320def _make_test_descriptors_from_testsources(test_sources, config): 321 test_descriptors = [] 322 323 for test_source in test_sources: 324 filename, ext = test_source.split()[0], "str" 325 if os.path.isfile(test_source): 326 filename, ext = get_filename_extension(test_source) 327 328 test_driver = config.testdriver 329 if is_config_str(test_source): 330 test_driver = _get_test_driver(test_source) 331 332 # get params 333 config_file = _get_config_file( 334 os.path.join(os.path.dirname(test_source), filename), ext, config) 335 test_type = _get_test_type(config_file, test_driver, ext) 336 if not config_file: 337 if getattr(config, ConfigConst.testcase, "") and not \ 338 getattr(config, ConfigConst.testlist): 339 LOG.debug("Can't find the json file of config") 340 from xdevice import Scheduler 341 if Scheduler.device_labels: 342 config_file, test_type = _generate_config_file( 343 Scheduler.device_labels, 344 os.path.join(os.path.dirname(test_source), filename), 345 ext, test_type) 346 setattr(Scheduler, "tmp_json", config_file) 347 LOG.debug("Generate temp json success: %s" % config_file) 348 desc = _create_descriptor(config_file, filename, test_source, 349 test_type, config) 350 if desc: 351 test_descriptors.append(desc) 352 353 return test_descriptors 354 355 356def _create_descriptor(config_file, filename, test_source, test_type, config): 357 from xdevice import Scheduler 358 from _core.executor.request import Descriptor 359 360 error_message = "" 361 if not test_type: 362 error_message = "no driver to execute '%s'" % test_source 363 LOG.error(error_message, error_no="00112") 364 if Scheduler.mode != ModeType.decc: 365 return None 366 367 # create Descriptor 368 uid = unique_id("TestSource", filename) 369 module_name = _parse_module_name(config_file, filename) 370 desc = Descriptor(uuid=uid, name=filename, 371 source=TestSource(test_source, "", config_file, 372 filename, test_type, module_name)) 373 if not os.path.isfile(test_source): 374 if is_config_str(test_source): 375 desc = Descriptor(uuid=uid, name=filename, 376 source=TestSource("", test_source, config_file, 377 filename, test_type, 378 module_name)) 379 else: 380 if config.testcase and not config.testlist: 381 error_message = "test case '%s' or '%s' not exists" % ( 382 "%s%s" % (test_source, PY_SUFFIX), "%s%s" % ( 383 test_source, PYD_SUFFIX)) 384 error_no = "00103" 385 else: 386 error_message = "test source '%s' or '%s' not exists" % ( 387 test_source, "%s%s" % (test_source, MODULE_CONFIG_SUFFIX)) 388 error_no = "00102" 389 if Scheduler.mode != ModeType.decc: 390 raise ParamError(error_message, error_no=error_no) 391 392 if Scheduler.mode == ModeType.decc and error_message: 393 Scheduler.report_not_executed(config.report_path, [("", desc)], 394 error_message) 395 return None 396 397 return desc 398 399 400def _get_config_file(filename, ext=None, config=None): 401 config_file = None 402 if os.path.exists("%s%s" % (filename, MODULE_CONFIG_SUFFIX)): 403 config_file = "%s%s" % (filename, MODULE_CONFIG_SUFFIX) 404 return config_file 405 if ext and os.path.exists("%s%s%s" % (filename, ext, 406 MODULE_CONFIG_SUFFIX)): 407 config_file = "%s%s%s" % (filename, ext, MODULE_CONFIG_SUFFIX) 408 return config_file 409 if config and getattr(config, "testcase", "") and not getattr( 410 config, "testlist"): 411 return _get_testcase_config_file(filename) 412 413 return config_file 414 415 416def _get_testcase_config_file(filename): 417 depth = 1 418 dirname = os.path.dirname(filename) 419 while dirname and depth < MAX_DIR_DEPTH: 420 for item in os.listdir(dirname): 421 item_path = os.path.join(dirname, item) 422 if os.path.isfile(item_path) and item.endswith( 423 MODULE_CONFIG_SUFFIX): 424 return item_path 425 depth += 1 426 dirname = os.path.dirname(dirname) 427 return None 428 429 430def _get_component_info_file(entry_dir): 431 module_files = [] 432 if not os.path.isdir(entry_dir): 433 return module_files 434 for item in os.listdir(entry_dir): 435 item_path = os.path.join(entry_dir, item) 436 if os.path.isfile(item_path) and item_path.endswith( 437 MODULE_INFO_SUFFIX): 438 module_files.append(item_path) 439 return module_files 440 441 442def _get_test_type(config_file, test_driver, ext): 443 if test_driver: 444 return test_driver 445 446 if config_file: 447 if not os.path.exists(config_file): 448 LOG.error("Config file '%s' not exists" % config_file, 449 error_no="00110") 450 return "" 451 return _get_test_driver(config_file) 452 if ext in [".py", ".js", ".dex", ".hap", ".bin"] \ 453 and ext in TestDictSource.exe_type.keys(): 454 test_type = TestDictSource.exe_type[ext] 455 elif ext in [".apk"] and ext in TestDictSource.exe_type.keys(): 456 test_type = DeviceTestType.hap_test 457 else: 458 test_type = DeviceTestType.cpp_test 459 return test_type 460 461 462def _parse_module_name(config_file, file_name): 463 if config_file: 464 return get_filename_extension(config_file)[0] 465 else: 466 if "{" in file_name: 467 return "report" 468 return file_name 469 470 471def _generate_config_file(device_labels, filename, ext, test_type): 472 if test_type not in [HostDrivenTestType.device_test]: 473 test_type = HostDrivenTestType.device_test 474 top_dict = {"environment": [], "driver": {"type": test_type, 475 "py_file": "%s%s" % (filename, ext)}} 476 for label in device_labels: 477 device_json_list = top_dict.get("environment") 478 device_json_list.append({"type": "device", "label": label}) 479 480 save_file = os.path.join(os.path.dirname(filename), 481 "%s.json" % os.path.basename(filename)) 482 save_file_open = \ 483 os.open(save_file, os.O_WRONLY | os.O_CREAT, FilePermission.mode_755) 484 with os.fdopen(save_file_open, "w") as save_handler: 485 save_handler.write(json.dumps(top_dict, indent=4)) 486 return save_file, test_type 487 488 489class TestDictSource: 490 exe_type = dict() 491 test_type = dict() 492 493 @classmethod 494 def reset(cls): 495 cls.test_type = copy.deepcopy(TEST_TYPE_DICT) 496 cls.exe_type = copy.deepcopy(EXT_TYPE_DICT) 497