1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2023 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import copy 20import datetime 21import os 22import queue 23import shutil 24import json 25import threading 26from collections import Counter 27from _core.context.impl import BaseScheduler 28from _core.context.result import ExecuteFinished 29from _core.utils import unique_id 30from _core.utils import check_mode 31from _core.utils import get_filename_extension 32 33from _core.utils import get_cst_time 34from _core.environment.manager_env import EnvironmentManager 35from _core.error import ErrorMessage 36from _core.exception import ExecuteTerminate 37from _core.exception import LiteDeviceError 38from _core.exception import DeviceError 39from _core.executor.request import Request 40from _core.executor.request import Descriptor 41 42from _core.plugin import Config 43from _core.constants import SchedulerType 44from _core.plugin import Plugin 45from _core.constants import TestExecType 46from _core.constants import CKit 47from _core.constants import ModeType 48from _core.constants import DeviceLabelType 49from _core.constants import ConfigConst 50from _core.constants import ReportConst 51from _core.executor.concurrent import DriversThread 52from _core.executor.concurrent import ModuleThread 53from _core.executor.concurrent import ExecuteMessage 54from _core.executor.source import TestSetSource 55from _core.executor.source import find_test_descriptors 56from _core.executor.source import find_testdict_descriptors 57from _core.logger import platform_logger 58from _core.utils import convert_serial 59from _core.report.reporter_helper import ExecInfo 60 61from _core.context.life_stage import TaskStart, StageEvent 62from _core.context.life_stage import TaskEnd 63from _core.context.life_stage import CaseStart 64from _core.context.life_stage import CaseEnd 65from _core.context.upload import Uploader 66from _core.context.option_util import get_device_options 67from _core.context.tdd import TSD 68from _core.exception import ParamError 69from _core.context.handler import report_not_executed 70from _core.context.life_stage import ILifeStageListener 71from _core.variables import Variables 72 73__all__ = ["Scheduler"] 74 75LOG = platform_logger("Scheduler") 76 77 78@Plugin(type=Plugin.SCHEDULER, id=SchedulerType.scheduler) 79class Scheduler(BaseScheduler): 80 """ 81 The Scheduler is the main entry point for client code that wishes to 82 discover and execute tests. 83 """ 84 lock = threading.Lock() 85 terminate_result = queue.Queue() 86 used_devices = {} 87 88 def _do_execute_(self, task): 89 Scheduler.used_devices.clear() 90 if task.config.exectype == TestExecType.device_test: 91 self._device_test_execute(task) 92 elif task.config.exectype == TestExecType.host_test: 93 self._host_test_execute(task) 94 else: 95 LOG.info("Exec type %s is bypassed" % task.config.exectype) 96 97 def __discover__(self, args): 98 """Discover task to execute""" 99 from _core.executor.request import Task 100 repeat = Variables.config.taskargs.get(ConfigConst.repeat) 101 if not repeat: 102 repeat = args.get(ConfigConst.repeat, 1) 103 args.update({ConfigConst.repeat: int(repeat)}) 104 config = Config() 105 config.update(args) 106 task = Task(drivers=[]) 107 task.init(config) 108 self.add_life_stage_listener(TaskListener()) 109 action = args.get("action", "") 110 task_start = TaskStart(action) 111 self.notify_stage(task_start) 112 root_descriptor = self._find_test_root_descriptor(task.config) 113 task.set_root_descriptor(root_descriptor) 114 return task 115 116 def _device_test_execute(self, task): 117 try: 118 self.run_in_loop(task, run_func=self.run_dynamic_concurrent) 119 finally: 120 Scheduler.__reset_environment__(self.used_devices) 121 122 def _host_test_execute(self, task, ): 123 """Execute host test""" 124 self.run_in_loop(task, run_func=self.run_host_test) 125 126 def run_host_test(self, task, test_drivers, current_driver_threads, message_queue): 127 # get test driver and device 128 test_driver = test_drivers[0] 129 130 # display executing progress 131 self._display_executing_process(None, test_driver, 132 test_drivers) 133 134 # start driver thread 135 thread = self._start_driver_thread(current_driver_threads, ( 136 None, message_queue, task, test_driver)) 137 138 if task.config.scheduler == SchedulerType.synchronize: 139 thread.join() 140 141 def generate_task_report(self, task): 142 task_info = ExecInfo() 143 test_type = getattr(task.config, "testtype", []) 144 task_name = getattr(task.config, "task", "") 145 if task_name: 146 task_info.test_type = str(task_name).upper() 147 else: 148 task_info.test_type = ",".join(test_type) if test_type else "Test" 149 if self.used_devices: 150 serials = [] 151 platforms = [] 152 test_labels = [] 153 for serial, device in self.used_devices.items(): 154 serials.append(convert_serial(serial)) 155 platform = str(device.test_platform) 156 test_label = str(device.label).capitalize() 157 if platform not in platforms: 158 platforms.append(platform) 159 if test_label not in test_labels: 160 test_labels.append(test_label) 161 task_info.device_name = ",".join(serials) 162 task_info.platform = ",".join(platforms) 163 task_info.device_label = ",".join(test_labels) 164 else: 165 task_info.device_name = "None" 166 task_info.platform = "None" 167 task_info.device_label = "None" 168 task_info.repeat = getattr(task.config, ConfigConst.repeat, 1) 169 task_info.test_time = task.config.start_time 170 task_info.product_info = getattr(task, "product_info", "") 171 172 return task_info 173 174 def __allocate_environment__(self, options, test_driver): 175 device_options = get_device_options(options, test_driver[1].source) 176 environment = None 177 env_manager = EnvironmentManager() 178 while True: 179 if not self.is_executing(): 180 break 181 if not self.is_monitor_alive(): 182 LOG.error("Queue monitor thread is dead.") 183 break 184 environment = env_manager.apply_environment(device_options) 185 applied_device_cnt = len(environment.devices) 186 required_device_cnt = len(device_options) 187 if applied_device_cnt == required_device_cnt: 188 return environment 189 else: 190 env_manager.release_environment(environment) 191 LOG.debug("'%s' is waiting available device", 192 test_driver[1].source.test_name) 193 if env_manager.check_device_exist(device_options): 194 continue 195 else: 196 raise DeviceError(ErrorMessage.Common.Code_0101021.format(required_device_cnt, applied_device_cnt)) 197 198 return environment 199 200 @classmethod 201 def __free_environment__(cls, environment): 202 env_manager = EnvironmentManager() 203 env_manager.release_environment(environment) 204 205 @staticmethod 206 def __reset_environment__(used_devices): 207 env_manager = EnvironmentManager() 208 env_manager.reset_environment(used_devices) 209 210 @classmethod 211 def _check_device_spt(cls, kit, driver_request, device): 212 kit_spt = cls._parse_property_value(ConfigConst.spt, 213 driver_request, kit) 214 if not kit_spt: 215 setattr(device, ConfigConst.task_state, False) 216 LOG.error("Spt is empty", error_no="00108") 217 return 218 if getattr(driver_request, ConfigConst.product_info, ""): 219 product_info = getattr(driver_request, 220 ConfigConst.product_info) 221 if not isinstance(product_info, dict): 222 LOG.warning("Product info should be dict, %s", 223 product_info) 224 setattr(device, ConfigConst.task_state, False) 225 return 226 device_spt = product_info.get("Security Patch", None) 227 if not device_spt or not \ 228 Scheduler._compare_spt_time(kit_spt, device_spt): 229 LOG.error("The device %s spt is %s, " 230 "and the test case spt is %s, " 231 "which does not meet the requirements" % 232 (device.device_sn, device_spt, kit_spt), 233 error_no="00116") 234 setattr(device, ConfigConst.task_state, False) 235 return 236 237 def _decc_task_setup(self, environment, task): 238 config = Config() 239 config.update(task.config.__dict__) 240 config.environment = environment 241 driver_request = Request(config=config) 242 243 if environment is None: 244 return False 245 246 for device in environment.devices: 247 if not getattr(device, ConfigConst.need_kit_setup, True): 248 LOG.debug("Device %s need kit setup is false" % device) 249 continue 250 251 # do task setup for device 252 kits_copy = copy.deepcopy(task.config.kits) 253 setattr(device, ConfigConst.task_kits, kits_copy) 254 for kit in getattr(device, ConfigConst.task_kits, []): 255 if not self.is_executing(): 256 break 257 try: 258 kit.__setup__(device, request=driver_request) 259 except (ParamError, ExecuteTerminate, DeviceError, 260 LiteDeviceError, ValueError, TypeError, 261 SyntaxError, AttributeError) as exception: 262 error_no = getattr(exception, "error_no", "00000") 263 LOG.exception( 264 "Task setup device: %s, exception: %s" % ( 265 environment.__get_serial__(), 266 exception), exc_info=False, error_no=error_no) 267 if kit.__class__.__name__ == CKit.query and \ 268 device.label in [DeviceLabelType.ipcamera]: 269 self._check_device_spt(kit, driver_request, device) 270 LOG.debug("Set device %s need kit setup to false" % device) 271 setattr(device, ConfigConst.need_kit_setup, False) 272 273 for device in environment.devices: 274 if not getattr(device, ConfigConst.task_state, True): 275 return False 276 277 # set product_info to self.task 278 if getattr(driver_request, ConfigConst.product_info, "") and \ 279 not getattr(task, ConfigConst.product_info, ""): 280 product_info = getattr(driver_request, ConfigConst.product_info) 281 if not isinstance(product_info, dict): 282 LOG.warning("Product info should be dict, %s", 283 product_info) 284 else: 285 setattr(task, ConfigConst.product_info, product_info) 286 return True 287 288 def run_dynamic_concurrent(self, task, test_drivers, current_driver_threads, message_queue): 289 task_unused_env = [] 290 test_driver = test_drivers[0] 291 self.notify_stage(CaseStart(test_driver[1].source.module_name, test_driver)) 292 293 if getattr(task.config, ConfigConst.history_report_path, ""): 294 module_name = test_driver[1].source.module_name 295 if not self._is_module_need_retry(task, module_name): 296 self._display_executing_process(None, test_driver, 297 test_drivers) 298 LOG.info("%s are passed, no need to retry" % module_name) 299 self._append_history_result(task, module_name) 300 LOG.info("") 301 return 302 303 if getattr(task.config, ConfigConst.component_mapper, ""): 304 module_name = test_driver[1].source.module_name 305 self._component_task_setup(task, module_name) 306 307 # get environment 308 try: 309 environment = self.__allocate_environment__( 310 task.config.__dict__, test_driver) 311 except DeviceError as exception: 312 self._handle_device_error(exception, task, test_drivers) 313 self.notify_stage(CaseEnd(test_driver[1].source.module_name, "Failed", exception.args)) 314 return 315 if not self.is_executing(): 316 if environment: 317 Scheduler.__free_environment__(environment) 318 319 if check_mode(ModeType.decc) or getattr( 320 task.config, ConfigConst.check_device, False): 321 LOG.info("Start to check environment: %s" % 322 environment.__get_serial__()) 323 status = self._decc_task_setup(environment, task) 324 if not status: 325 self.__free_environment__(environment) 326 task_unused_env.append(environment) 327 error_message = "Load Error[00116]" 328 report_not_executed(task.config.report_path, [test_driver], 329 error_message, task) 330 return 331 332 else: 333 LOG.info("Environment %s check success", 334 environment.__get_serial__()) 335 336 # display executing progress 337 self._display_executing_process(environment, test_driver, 338 test_drivers) 339 340 # add to used devices and set need_kit_setup attribute 341 self._append_used_devices(environment, self.used_devices) 342 343 # start driver thread 344 self._start_driver_thread(current_driver_threads, ( 345 environment, message_queue, task, test_driver)) 346 347 self._do_taskkit_teardown(self.used_devices, task_unused_env) 348 349 @classmethod 350 def _append_history_result(cls, task, module_name): 351 history_report_path = getattr( 352 task.config, ConfigConst.history_report_path, "") 353 from _core.report.result_reporter import ResultReporter 354 params = ResultReporter.get_task_info_params( 355 history_report_path) 356 357 if not params or not params[ReportConst.data_reports]: 358 LOG.debug("Task info record data reports is empty") 359 return 360 361 report_data_dict = dict(params[ReportConst.data_reports]) 362 if module_name not in report_data_dict.keys(): 363 module_name_ = str(module_name).split(".")[0] 364 if module_name_ not in report_data_dict.keys(): 365 LOG.error("%s not in data reports" % module_name) 366 return 367 module_name = module_name_ 368 369 from xdevice import SuiteReporter 370 if check_mode(ModeType.decc): 371 virtual_report_path, report_result = SuiteReporter. \ 372 get_history_result_by_module(module_name) 373 LOG.debug("Append history result: (%s, %s)" % ( 374 virtual_report_path, report_result)) 375 SuiteReporter.append_report_result( 376 (virtual_report_path, report_result)) 377 else: 378 history_execute_result = report_data_dict.get(module_name, "") 379 LOG.info("Start copy %s" % history_execute_result) 380 file_name = get_filename_extension(history_execute_result)[0] 381 if os.path.exists(history_execute_result): 382 result_dir = \ 383 os.path.join(task.config.report_path, "result") 384 os.makedirs(result_dir, exist_ok=True) 385 target_execute_result = "%s.xml" % os.path.join( 386 task.config.report_path, "result", file_name) 387 shutil.copyfile(history_execute_result, target_execute_result) 388 LOG.info("Copy %s to %s" % ( 389 history_execute_result, target_execute_result)) 390 if check_mode(ModeType.controller): 391 request = Request("", task.test_drivers[0][1], "", task.config) 392 exec_message = ExecuteMessage("", "", "", "") 393 exec_message.set_result(target_execute_result) 394 exec_message.set_request(request) 395 # # 是不是得加上調度已經停止的流程了 396 Uploader.upload_module_result(exec_message) 397 else: 398 error_msg = "Copy failed! %s not exists!" % \ 399 history_execute_result 400 raise ParamError(error_msg) 401 402 def _handle_device_error(self, exception, task, test_drivers): 403 test_driver = test_drivers[0] 404 self._display_executing_process(None, test_driver, test_drivers) 405 error_message = str(exception) 406 LOG.exception(error_message, exc_info=False, error_no=exception.error_no) 407 report_not_executed(task.config.report_path, [test_driver], error_message, task) 408 test_source = test_driver[1].source 409 case_id = test_source.module_name or test_source.test_name 410 Uploader.upload_unavailable_result(error_message, case_id=case_id) 411 LOG.info("") 412 413 def _start_driver_thread(self, current_driver_threads, thread_params): 414 environment, message_queue, task, test_driver = thread_params 415 416 if task.config.scheduler == SchedulerType.module: 417 driver_thread = ModuleThread(test_driver, task, environment, 418 message_queue, self.lock) 419 else: 420 driver_thread = DriversThread(test_driver, task, environment, 421 message_queue) 422 thread_name = self._get_thread_name(current_driver_threads) 423 driver_thread.daemon = True 424 driver_thread.name = thread_name 425 driver_thread.set_listeners(self.__create_listeners__(task)) 426 driver_thread.start() 427 current_driver_threads.setdefault(thread_name, driver_thread) 428 LOG.info(f"Driver executing in thread {driver_thread.ident}") 429 LOG.info(f"Thread {thread_name} execute started") 430 return driver_thread 431 432 @classmethod 433 def _do_taskkit_teardown(cls, used_devices, task_unused_env): 434 for device in used_devices.values(): 435 if getattr(device, ConfigConst.need_kit_setup, True): 436 continue 437 438 for kit in getattr(device, ConfigConst.task_kits, []): 439 try: 440 kit.__teardown__(device) 441 except Exception as error: 442 LOG.debug("Do task kit teardown: %s" % error) 443 setattr(device, ConfigConst.task_kits, []) 444 setattr(device, ConfigConst.need_kit_setup, True) 445 446 for environment in task_unused_env: 447 for device in environment.devices: 448 setattr(device, ConfigConst.task_state, True) 449 setattr(device, ConfigConst.need_kit_setup, True) 450 451 def _display_executing_process(self, environment, test_driver, 452 test_drivers): 453 source_content = test_driver[1].source.source_file or \ 454 test_driver[1].source.source_string 455 if environment is None: 456 LOG.info("[%d / %d] Executing: %s, Driver: %s" % 457 (self.test_number - len(test_drivers) + 1, 458 self.test_number, source_content, 459 test_driver[1].source.test_type)) 460 return 461 462 LOG.info("[%d / %d] Executing: %s, Device: %s, Driver: %s" % 463 (self.test_number - len(test_drivers) + 1, 464 self.test_number, source_content, 465 environment.__get_serial__(), 466 test_driver[1].source.test_type)) 467 468 @classmethod 469 def _get_thread_name(cls, current_driver_threads): 470 thread_id = get_cst_time().strftime('%Y-%m-%d-%H-%M-%S-%f') 471 while thread_id in current_driver_threads.keys(): 472 thread_id = get_cst_time().strftime('%Y-%m-%d-%H-%M-%S-%f') 473 return thread_id 474 475 @classmethod 476 def _append_used_devices(cls, environment, used_devices): 477 if environment is not None: 478 for device in environment.devices: 479 device_serial = device.__get_serial__() if device else "None" 480 if device_serial and device_serial not in used_devices.keys(): 481 used_devices[device_serial] = device 482 483 @classmethod 484 def _reset_environment(cls, environment="", config_file=""): 485 env_manager = EnvironmentManager() 486 env_manager.env_stop() 487 EnvironmentManager(environment, config_file) 488 489 @classmethod 490 def _restore_environment(cls): 491 env_manager = EnvironmentManager() 492 env_manager.env_stop() 493 EnvironmentManager() 494 495 def _on_task_error_(self, task, exception: Exception): 496 Uploader.upload_unavailable_result(str(exception.args)) 497 498 def _on_execute_finished_(self, task, result: ExecuteFinished): 499 TSD.reset_test_dict_source() 500 if getattr(task.config, ConfigConst.test_environment, "") or \ 501 getattr(task.config, ConfigConst.configfile, ""): 502 self._restore_environment() 503 504 self.notify_stage(TaskEnd(task.config.report_path, result.unavailable, result.error_msg)) 505 Uploader.upload_task_result(task, result.error_msg) 506 self.upload_report_end() 507 508 @staticmethod 509 def _find_test_root_descriptor(config): 510 if getattr(config, ConfigConst.task, None) or \ 511 getattr(config, ConfigConst.testargs, None): 512 Scheduler._pre_component_test(config) 513 514 if getattr(config, ConfigConst.subsystems, "") or \ 515 getattr(config, ConfigConst.parts, "") or \ 516 getattr(config, ConfigConst.component_base_kit, ""): 517 uid = unique_id("Scheduler", "component") 518 if config.subsystems or config.parts: 519 test_set = (config.subsystems, config.parts) 520 else: 521 kit = getattr(config, ConfigConst.component_base_kit) 522 test_set = kit.get_white_list() 523 524 root = Descriptor(uuid=uid, name="component", 525 source=TestSetSource(test_set), 526 container=True) 527 528 root.children = find_test_descriptors(config) 529 return root 530 # read test list from testdict 531 if getattr(config, ConfigConst.testdict, "") != "" and getattr( 532 config, ConfigConst.testfile, "") == "": 533 uid = unique_id("Scheduler", "testdict") 534 root = Descriptor(uuid=uid, name="testdict", 535 source=TestSetSource(config.testdict), 536 container=True) 537 root.children = find_testdict_descriptors(config) 538 return root 539 540 # read test list from testfile, testlist or task 541 test_set = getattr(config, ConfigConst.testfile, "") or getattr( 542 config, ConfigConst.testlist, "") or getattr( 543 config, ConfigConst.task, "") or getattr( 544 config, ConfigConst.testcase) 545 if test_set: 546 fname, _ = get_filename_extension(test_set) 547 uid = unique_id("Scheduler", fname) 548 root = Descriptor(uuid=uid, name=fname, 549 source=TestSetSource(test_set), container=True) 550 if config.scheduler == SchedulerType.module: 551 Scheduler._find_children_module(root, config) 552 else: 553 Scheduler._find_children_default(root, config) 554 return root 555 else: 556 raise ParamError(ErrorMessage.Common.Code_0101022) 557 558 @staticmethod 559 def _find_children_default(root, config): 560 root.children = find_test_descriptors(config) 561 562 @staticmethod 563 def _find_children_module(root, config): 564 desc = find_test_descriptors(config) 565 common_kits = {} 566 task_list = {} 567 all_data = {} 568 for i in desc: 569 module_subsystem = i.source.module_subsystem 570 if module_subsystem not in task_list: 571 task_list.update({module_subsystem: 1}) 572 else: 573 task_list.update({module_subsystem: task_list.get(module_subsystem) + 1}) 574 if module_subsystem not in all_data: 575 all_data[module_subsystem] = { 576 "run-command": [], 577 'pre-push': [], 578 "push": [] 579 } 580 kits = Scheduler._get_test_kits(i.source.config_file) 581 for kit in kits: 582 if kit.get("type") == "AppInstallKit": 583 continue 584 if kit.get("type") == "ShellKit": 585 shell_kit = kit.get("run-command", []) 586 for command in shell_kit: 587 if "remount" in command or "mkdir" in command: 588 all_data[module_subsystem].get("run-command").append(command) 589 if kit.get("type") == "PushKit": 590 pre_push_kit = kit.get("pre-push", []) 591 for command in pre_push_kit: 592 if "remount" in command or "mkdir" in command: 593 all_data[module_subsystem].get("pre-push").append(command) 594 push_kit = kit.get("push", []) 595 all_data[module_subsystem].get("push").extend(push_kit) 596 for _, value in all_data.items(): 597 for key1, value1 in value.items(): 598 common = value1 599 count = Counter(common) 600 new_common = [k for k, v in count.items() if v > 1] 601 value[key1] = new_common 602 for key, value in all_data.items(): 603 common_kit_tem = [{ 604 'type': 'ShellKit', 605 'run-command': value.get("run-command", []) 606 }, { 607 'type': 'CommonPushKit', 608 'pre-push': value.get("pre-push", []), 609 "push": value.get("push", []) 610 }] 611 common_kits.update({key: common_kit_tem}) 612 LOG.debug(common_kits) 613 LOG.debug(task_list) 614 root.children = sorted(desc, key=lambda x: x.source.module_subsystem) 615 setattr(root, "common_kits", common_kits) 616 setattr(root, "task_list", task_list) 617 618 @staticmethod 619 def _get_test_kits(test_source): 620 try: 621 from _core.testkit.json_parser import JsonParser 622 json_config = JsonParser(test_source) 623 return json_config.get_kits() 624 except ParamError as error: 625 LOG.error(error, error_no=error.error_no) 626 return "" 627 628 @classmethod 629 def _terminate(cls): 630 LOG.info("Start to terminate execution") 631 return Scheduler.terminate_result.get() 632 633 @classmethod 634 def upload_report_end(cls): 635 if getattr(cls, "tmp_json", None): 636 os.remove(cls.tmp_json) 637 del cls.tmp_json 638 Uploader.upload_report_end() 639 640 @classmethod 641 def _is_module_need_retry(cls, task, module_name): 642 failed_flag = False 643 if check_mode(ModeType.decc): 644 from xdevice import SuiteReporter 645 for module, _ in SuiteReporter.get_failed_case_list(): 646 if module_name == module or str(module_name).split( 647 ".")[0] == module: 648 failed_flag = True 649 break 650 else: 651 from xdevice import ResultReporter 652 history_report_path = \ 653 getattr(task.config, ConfigConst.history_report_path, "") 654 params = ResultReporter.get_task_info_params(history_report_path) 655 if params and params[ReportConst.unsuccessful_params]: 656 if dict(params[ReportConst.unsuccessful_params]).get( 657 module_name, []): 658 failed_flag = True 659 elif dict(params[ReportConst.unsuccessful_params]).get( 660 str(module_name).split(".")[0], []): 661 failed_flag = True 662 return failed_flag 663 664 @classmethod 665 def _compare_spt_time(cls, kit_spt, device_spt): 666 if not kit_spt or not device_spt: 667 return False 668 try: 669 kit_time = str(kit_spt).split("-")[:2] 670 device_time = str(device_spt).split("-")[:2] 671 k_spt = datetime.datetime.strptime( 672 "-".join(kit_time), "%Y-%m") 673 d_spt = datetime.datetime.strptime("-".join(device_time), "%Y-%m") 674 except ValueError as value_error: 675 LOG.debug("Date format is error, %s" % value_error.args) 676 return False 677 month_interval = int(k_spt.month) - int(d_spt.month) 678 year_interval = int(k_spt.year) - int(d_spt.year) 679 LOG.debug("Kit spt (year=%s, month=%s), device spt (year=%s, month=%s)" 680 % (k_spt.year, k_spt.month, d_spt.year, d_spt.month)) 681 if year_interval < 0: 682 return True 683 if year_interval == 0 and month_interval in range(-11, 3): 684 return True 685 if year_interval == 1 and month_interval + 12 in (1, 2): 686 return True 687 return False 688 689 @classmethod 690 def _parse_property_value(cls, property_name, driver_request, kit): 691 test_args = copy.deepcopy( 692 driver_request.config.get(ConfigConst.testargs, dict())) 693 property_value = "" 694 if ConfigConst.pass_through in test_args.keys(): 695 pt_dict = json.loads(test_args.get(ConfigConst.pass_through, "")) 696 property_value = pt_dict.get(property_name, None) 697 elif property_name in test_args.keys: 698 property_value = test_args.get(property_name, None) 699 return property_value if property_value else \ 700 kit.properties.get(property_name, None) 701 702 @classmethod 703 def _pre_component_test(cls, config): 704 if not config.kits: 705 return 706 cur_kit = None 707 for kit in config.kits: 708 if kit.__class__.__name__ == CKit.component: 709 cur_kit = kit 710 break 711 if not cur_kit: 712 return 713 get_white_list = getattr(cur_kit, "get_white_list", None) 714 if not callable(get_white_list): 715 return 716 subsystems, parts = get_white_list() 717 if not subsystems and not parts: 718 return 719 setattr(config, ConfigConst.component_base_kit, cur_kit) 720 721 @classmethod 722 def _component_task_setup(cls, task, module_name): 723 component_kit = task.config.get(ConfigConst.component_base_kit, None) 724 if not component_kit: 725 # only -p -s .you do not care about the components that can be 726 # supported. you only want to run the use cases of the current 727 # component 728 return 729 LOG.debug("Start component task setup") 730 _component_mapper = task.config.get(ConfigConst.component_mapper) 731 _subsystem, _part = _component_mapper.get(module_name) 732 733 is_hit = False 734 # find in cache. if not find, update cache 735 cache_subsystem, cache_part = component_kit.get_cache() 736 if _subsystem in cache_subsystem or _part in cache_subsystem: 737 is_hit = True 738 if not is_hit: 739 env_manager = EnvironmentManager() 740 for _, manager in env_manager.managers.items(): 741 if getattr(manager, "devices_list", []): 742 for device in manager.devices_list: 743 component_kit.__setup__(device) 744 cache_subsystem, cache_part = component_kit.get_cache() 745 if _subsystem in cache_subsystem or _part in cache_subsystem: 746 is_hit = True 747 if not is_hit: 748 LOG.warning("%s are skipped, no suitable component found. " 749 "Require subsystem=%s part=%s, no device match this" 750 % (module_name, _subsystem, _part)) 751 752 753class TaskListener(ILifeStageListener): 754 def __on_event__(self, stage_event: StageEvent): 755 from xdevice import Task 756 if Task.life_stage_listener: 757 data = stage_event.get_data() 758 Task.life_stage_listener(data) 759