1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import copy 20import datetime 21import os 22import queue 23import time 24import uuid 25import shutil 26import json 27from xml.etree import ElementTree 28 29from _core.utils import unique_id 30from _core.utils import check_mode 31from _core.utils import get_sub_path 32from _core.utils import get_filename_extension 33from _core.utils import convert_serial 34from _core.utils import convert_mac 35from _core.utils import get_instance_name 36from _core.utils import is_config_str 37from _core.utils import check_result_report 38from _core.utils import get_cst_time 39from _core.environment.manager_env import EnvironmentManager 40from _core.environment.manager_env import DeviceSelectionOption 41from _core.exception import ParamError 42from _core.exception import ExecuteTerminate 43from _core.exception import LiteDeviceError 44from _core.exception import DeviceError 45from _core.interface import LifeCycle 46from _core.executor.request import Request 47from _core.executor.request import Descriptor 48from _core.plugin import get_plugin 49from _core.plugin import Plugin 50from _core.plugin import Config 51from _core.report.reporter_helper import ExecInfo 52from _core.report.reporter_helper import ReportConstant 53from _core.report.reporter_helper import Case 54from _core.report.reporter_helper import DataHelper 55from _core.constants import TestExecType 56from _core.constants import CKit 57from _core.constants import ModeType 58from _core.constants import DeviceLabelType 59from _core.constants import SchedulerType 60from _core.constants import ListenerType 61from _core.constants import ConfigConst 62from _core.constants import ReportConst 63from _core.constants import HostDrivenTestType 64from _core.constants import LifeStage 65from _core.executor.concurrent import DriversThread 66from _core.executor.concurrent import QueueMonitorThread 67from _core.executor.concurrent import DriversDryRunThread 68from _core.executor.source import TestSetSource 69from _core.executor.source import find_test_descriptors 70from _core.executor.source import find_testdict_descriptors 71from _core.executor.source import TestDictSource 72from _core.logger import platform_logger 73from _core.logger import add_task_file_handler 74from _core.logger import remove_task_file_handler 75from _core.logger import add_encrypt_file_handler 76from _core.logger import remove_encrypt_file_handler 77 78__all__ = ["Scheduler"] 79LOG = platform_logger("Scheduler") 80 81MAX_VISIBLE_LENGTH = 150 82 83 84@Plugin(type=Plugin.SCHEDULER, id=SchedulerType.scheduler) 85class Scheduler(object): 86 """ 87 The Scheduler is the main entry point for client code that wishes to 88 discover and execute tests. 89 """ 90 # factory params 91 is_execute = True 92 terminate_result = queue.Queue() 93 upload_address = "" 94 task_type = "" 95 task_name = "" 96 mode = "" 97 proxy = None 98 99 # command_queue to store test commands 100 command_queue = [] 101 max_command_num = 50 102 # the number of tests in current task 103 test_number = 0 104 device_labels = [] 105 repeat_index = 0 106 auto_retry = -1 107 is_need_auto_retry = False 108 109 queue_monitor_thread = None 110 111 def __discover__(self, args): 112 """Discover task to execute""" 113 from _core.executor.request import Task 114 config = Config() 115 config.update(args) 116 task = Task(drivers=[]) 117 task.init(config) 118 119 Scheduler.call_life_stage_action(stage=LifeStage.task_start) 120 121 root_descriptor = self._find_test_root_descriptor(task.config) 122 task.set_root_descriptor(root_descriptor) 123 return task 124 125 def __execute__(self, task): 126 error_message = "" 127 unavailable = 0 128 try: 129 Scheduler.is_execute = True 130 if Scheduler.command_queue: 131 LOG.debug("Run command: {}".format(convert_mac(Scheduler.command_queue[-1]))) 132 run_command = Scheduler.command_queue.pop() 133 task_id = str(uuid.uuid1()).split("-")[0] 134 Scheduler.command_queue.append((task_id, run_command, 135 task.config.report_path)) 136 if len(Scheduler.command_queue) > self.max_command_num: 137 Scheduler.command_queue.pop(0) 138 139 if getattr(task.config, ConfigConst.test_environment, ""): 140 self._reset_environment(task.config.get( 141 ConfigConst.test_environment, "")) 142 elif getattr(task.config, ConfigConst.configfile, ""): 143 self._reset_environment(config_file=task.config.get( 144 ConfigConst.configfile, "")) 145 146 # do with the count of repeat about a task 147 if getattr(task.config, ConfigConst.repeat, 0) > 0: 148 Scheduler.repeat_index = \ 149 getattr(task.config, ConfigConst.repeat) 150 drivers_list = list() 151 for index in range(-1, task.config.repeat): 152 repeat_list = self.construct_repeat_list(task, index) 153 if repeat_list: 154 drivers_list.extend(repeat_list) 155 task.test_drivers = drivers_list 156 else: 157 Scheduler.repeat_index = 0 158 159 self.test_number = len(task.test_drivers) 160 for des in task.root.children: 161 if des.error: 162 error_message = "{};{}".format(des.error.error_msg, error_message) 163 unavailable += 1 164 if error_message != "": 165 error_message = "test source '{}' or its config json not exists".format(error_message) 166 LOG.error("Exec task error: {}".format(error_message)) 167 168 raise ParamError(error_message) 169 170 if task.config.exectype == TestExecType.device_test: 171 self._device_test_execute(task) 172 elif task.config.exectype == TestExecType.host_test: 173 self._host_test_execute(task) 174 else: 175 LOG.info("Exec type %s is bypassed" % task.config.exectype) 176 177 except (ParamError, ValueError, TypeError, SyntaxError, AttributeError, 178 DeviceError, LiteDeviceError, ExecuteTerminate) as exception: 179 error_no = getattr(exception, "error_no", "") 180 error_message = "%s[%s]" % (str(exception), error_no) \ 181 if error_no else str(exception) 182 error_no = error_no if error_no else "00000" 183 LOG.exception(exception, exc_info=False, error_no=error_no) 184 finally: 185 Scheduler.reset_test_dict_source() 186 if getattr(task.config, ConfigConst.test_environment, "") or \ 187 getattr(task.config, ConfigConst.configfile, ""): 188 self._restore_environment() 189 190 Scheduler.call_life_stage_action(stage=LifeStage.task_end, 191 task=task, error=error_message, 192 unavailable=unavailable) 193 if Scheduler.upload_address: 194 Scheduler.upload_task_result(task, error_message) 195 Scheduler.upload_report_end() 196 197 def _device_test_execute(self, task): 198 used_devices = {} 199 try: 200 self._dynamic_concurrent_execute(task, used_devices) 201 finally: 202 Scheduler.__reset_environment__(used_devices) 203 # generate reports 204 self._generate_task_report(task, used_devices) 205 206 def _host_test_execute(self, task): 207 """Execute host test""" 208 try: 209 # initial params 210 current_driver_threads = {} 211 test_drivers = task.test_drivers 212 message_queue = queue.Queue() 213 214 # execute test drivers 215 self.queue_monitor_thread = self._start_queue_monitor( 216 message_queue, test_drivers, current_driver_threads) 217 while test_drivers: 218 if len(current_driver_threads) > 5: 219 time.sleep(3) 220 continue 221 222 # clear remaining test drivers when scheduler is terminated 223 if not Scheduler.is_execute: 224 LOG.info("Clear test drivers") 225 self._clear_not_executed(task, test_drivers) 226 break 227 228 # get test driver and device 229 test_driver = test_drivers[0] 230 231 # display executing progress 232 self._display_executing_process(None, test_driver, 233 test_drivers) 234 235 # start driver thread 236 self._start_driver_thread(current_driver_threads, ( 237 None, message_queue, task, test_driver)) 238 test_drivers.pop(0) 239 240 # wait for all drivers threads finished and do kit teardown 241 while True: 242 if not self.queue_monitor_thread.is_alive(): 243 break 244 time.sleep(3) 245 246 finally: 247 # generate reports 248 self._generate_task_report(task) 249 250 def _dry_run_device_test_execute(self, task): 251 try: 252 # initial params 253 used_devices = {} 254 current_driver_threads = {} 255 test_drivers = task.test_drivers 256 message_queue = queue.Queue() 257 task_unused_env = [] 258 259 # execute test drivers 260 self.queue_monitor_thread = self._start_queue_monitor( 261 message_queue, test_drivers, current_driver_threads) 262 while test_drivers: 263 # clear remaining test drivers when scheduler is terminated 264 if not Scheduler.is_execute: 265 LOG.info("Clear test drivers") 266 self._clear_not_executed(task, test_drivers) 267 break 268 269 # get test driver and device 270 test_driver = test_drivers[0] 271 # get environment 272 try: 273 environment = self.__allocate_environment__( 274 task.config.__dict__, test_driver) 275 except DeviceError as exception: 276 self._handle_device_error(exception, task, test_drivers) 277 continue 278 279 if not Scheduler.is_execute: 280 if environment: 281 Scheduler.__free_environment__(environment) 282 continue 283 284 # start driver thread 285 thread_id = self._get_thread_id(current_driver_threads) 286 driver_thread = DriversDryRunThread(test_driver, task, 287 environment, 288 message_queue) 289 driver_thread.setDaemon(True) 290 driver_thread.set_thread_id(thread_id) 291 driver_thread.start() 292 current_driver_threads.setdefault(thread_id, driver_thread) 293 294 test_drivers.pop(0) 295 296 # wait for all drivers threads finished and do kit teardown 297 while True: 298 if not self.queue_monitor_thread.is_alive(): 299 break 300 time.sleep(3) 301 302 self._do_taskkit_teardown(used_devices, task_unused_env) 303 finally: 304 LOG.debug( 305 "Removing report_path: {}".format(task.config.report_path)) 306 # delete reports 307 self.stop_task_logcat() 308 self.stop_encrypt_log() 309 shutil.rmtree(task.config.report_path) 310 311 def _generate_task_report(self, task, used_devices=None): 312 task_info = ExecInfo() 313 test_type = getattr(task.config, "testtype", []) 314 task_name = getattr(task.config, "task", "") 315 if task_name: 316 task_info.test_type = str(task_name).upper() 317 else: 318 task_info.test_type = ",".join(test_type) if test_type else "Test" 319 if used_devices: 320 serials = [] 321 platforms = [] 322 test_labels = [] 323 for serial, device in used_devices.items(): 324 serials.append(convert_serial(serial)) 325 platform = str(device.label).capitalize() 326 test_label = str(device.label).capitalize() 327 if platform not in platforms: 328 platforms.append(platform) 329 if test_label not in test_labels: 330 test_labels.append(test_label) 331 task_info.device_name = ",".join(serials) 332 task_info.platform = ",".join(platforms) 333 task_info.device_label = ",".join(test_labels) 334 else: 335 task_info.device_name = "None" 336 task_info.platform = "None" 337 task_info.device_label = "None" 338 task_info.test_time = task.config.start_time 339 task_info.product_info = getattr(task, "product_info", "") 340 341 listeners = self._create_listeners(task) 342 for listener in listeners: 343 listener.__ended__(LifeCycle.TestTask, task_info, 344 test_type=task_info.test_type, task=task) 345 346 @classmethod 347 def _create_listeners(cls, task): 348 listeners = [] 349 # append log listeners 350 log_listeners = get_plugin(Plugin.LISTENER, ListenerType.log) 351 for log_listener in log_listeners: 352 log_listener_instance = log_listener.__class__() 353 listeners.append(log_listener_instance) 354 # append report listeners 355 report_listeners = get_plugin(Plugin.LISTENER, ListenerType.report) 356 for report_listener in report_listeners: 357 report_listener_instance = report_listener.__class__() 358 setattr(report_listener_instance, "report_path", 359 task.config.report_path) 360 listeners.append(report_listener_instance) 361 # append upload listeners 362 upload_listeners = get_plugin(Plugin.LISTENER, ListenerType.upload) 363 for upload_listener in upload_listeners: 364 upload_listener_instance = upload_listener.__class__() 365 listeners.append(upload_listener_instance) 366 return listeners 367 368 @staticmethod 369 def _find_device_options(environment_config, options, test_source): 370 devices_option = [] 371 index = 1 372 for device_dict in environment_config: 373 label = device_dict.get("label", "") 374 required_manager = device_dict.get("type", "device") 375 required_manager = \ 376 required_manager if required_manager else "device" 377 if not label: 378 continue 379 device_option = DeviceSelectionOption(options, label, test_source) 380 device_dict.pop("type", None) 381 device_dict.pop("label", None) 382 device_option.required_manager = required_manager 383 device_option.extend_value = device_dict 384 device_option.source_file = \ 385 test_source.config_file or test_source.source_string 386 if hasattr(device_option, "env_index"): 387 device_option.env_index = index 388 index += 1 389 devices_option.append(device_option) 390 return devices_option 391 392 def __allocate_environment__(self, options, test_driver): 393 device_options = self.get_device_options(options, 394 test_driver[1].source) 395 environment = None 396 env_manager = EnvironmentManager() 397 while True: 398 if not Scheduler.is_execute: 399 break 400 if self.queue_monitor_thread and \ 401 not self.queue_monitor_thread.is_alive(): 402 LOG.error("Queue monitor thread is dead.") 403 break 404 environment = env_manager.apply_environment(device_options) 405 if len(environment.devices) == len(device_options): 406 return environment 407 else: 408 env_manager.release_environment(environment) 409 LOG.debug("'%s' is waiting available device", 410 test_driver[1].source.test_name) 411 if env_manager.check_device_exist(device_options): 412 continue 413 else: 414 LOG.debug("'%s' required %s devices, actually %s devices" 415 " were found" % (test_driver[1].source.test_name, 416 len(device_options), 417 len(environment.devices))) 418 raise DeviceError("The '%s' required device does not exist" 419 % test_driver[1].source.source_file, 420 error_no="00104") 421 422 return environment 423 424 @classmethod 425 def get_device_options(cls, options, test_source): 426 device_options = [] 427 config_file = test_source.config_file 428 environment_config = [] 429 from _core.testkit.json_parser import JsonParser 430 if test_source.source_string and is_config_str( 431 test_source.source_string): 432 json_config = JsonParser(test_source.source_string) 433 environment_config = json_config.get_environment() 434 device_options = cls._find_device_options( 435 environment_config, options, test_source) 436 elif config_file and os.path.exists(config_file): 437 json_config = JsonParser(test_source.config_file) 438 environment_config = json_config.get_environment() 439 device_options = cls._find_device_options( 440 environment_config, options, test_source) 441 442 device_options = cls._calculate_device_options( 443 device_options, environment_config, options, test_source) 444 445 if ConfigConst.component_mapper in options.keys(): 446 required_component = options.get(ConfigConst.component_mapper). \ 447 get(test_source.module_name, None) 448 for device_option in device_options: 449 device_option.required_component = required_component 450 return device_options 451 452 @staticmethod 453 def __free_environment__(environment): 454 env_manager = EnvironmentManager() 455 env_manager.release_environment(environment) 456 457 @staticmethod 458 def __reset_environment__(used_devices): 459 env_manager = EnvironmentManager() 460 env_manager.reset_environment(used_devices) 461 462 @classmethod 463 def _check_device_spt(cls, kit, driver_request, device): 464 kit_spt = cls._parse_property_value(ConfigConst.spt, 465 driver_request, kit) 466 if not kit_spt: 467 setattr(device, ConfigConst.task_state, False) 468 LOG.error("Spt is empty", error_no="00108") 469 return 470 if getattr(driver_request, ConfigConst.product_info, ""): 471 product_info = getattr(driver_request, 472 ConfigConst.product_info) 473 if not isinstance(product_info, dict): 474 LOG.warning("Product info should be dict, %s", 475 product_info) 476 setattr(device, ConfigConst.task_state, False) 477 return 478 device_spt = product_info.get("Security Patch", None) 479 if not device_spt or not \ 480 Scheduler.compare_spt_time(kit_spt, device_spt): 481 LOG.error("The device %s spt is %s, " 482 "and the test case spt is %s, " 483 "which does not meet the requirements" % 484 (device.device_sn, device_spt, kit_spt), 485 error_no="00116") 486 setattr(device, ConfigConst.task_state, False) 487 return 488 489 def _decc_task_setup(self, environment, task): 490 config = Config() 491 config.update(task.config.__dict__) 492 config.environment = environment 493 driver_request = Request(config=config) 494 495 if environment is None: 496 return False 497 498 for device in environment.devices: 499 if not getattr(device, ConfigConst.need_kit_setup, True): 500 LOG.debug("Device %s need kit setup is false" % device) 501 continue 502 503 # do task setup for device 504 kits_copy = copy.deepcopy(task.config.kits) 505 setattr(device, ConfigConst.task_kits, kits_copy) 506 for kit in getattr(device, ConfigConst.task_kits, []): 507 if not Scheduler.is_execute: 508 break 509 try: 510 kit.__setup__(device, request=driver_request) 511 except (ParamError, ExecuteTerminate, DeviceError, 512 LiteDeviceError, ValueError, TypeError, 513 SyntaxError, AttributeError) as exception: 514 error_no = getattr(exception, "error_no", "00000") 515 LOG.exception( 516 "Task setup device: %s, exception: %s" % ( 517 environment.__get_serial__(), 518 exception), exc_info=False, error_no=error_no) 519 if kit.__class__.__name__ == CKit.query and \ 520 device.label in [DeviceLabelType.ipcamera]: 521 self._check_device_spt(kit, driver_request, device) 522 LOG.debug("Set device %s need kit setup to false" % device) 523 setattr(device, ConfigConst.need_kit_setup, False) 524 525 for device in environment.devices: 526 if not getattr(device, ConfigConst.task_state, True): 527 return False 528 529 # set product_info to self.task 530 if getattr(driver_request, ConfigConst.product_info, "") and \ 531 not getattr(task, ConfigConst.product_info, ""): 532 product_info = getattr(driver_request, ConfigConst.product_info) 533 if not isinstance(product_info, dict): 534 LOG.warning("Product info should be dict, %s", 535 product_info) 536 else: 537 setattr(task, ConfigConst.product_info, product_info) 538 return True 539 540 def _dynamic_concurrent_execute(self, task, used_devices): 541 # initial params 542 current_driver_threads = {} 543 test_drivers = task.test_drivers 544 message_queue = queue.Queue() 545 task_unused_env = [] 546 547 # execute test drivers 548 self.queue_monitor_thread = self._start_queue_monitor( 549 message_queue, test_drivers, current_driver_threads) 550 while test_drivers: 551 # clear remaining test drivers when scheduler is terminated 552 if not Scheduler.is_execute: 553 LOG.info("Clear test drivers") 554 self._clear_not_executed(task, test_drivers) 555 break 556 557 # get test driver and device 558 test_driver = test_drivers[0] 559 560 # call life stage 561 Scheduler.call_life_stage_action(stage=LifeStage.case_start, 562 case_name=test_driver[1].source.module_name) 563 564 if getattr(task.config, ConfigConst.history_report_path, ""): 565 module_name = test_driver[1].source.module_name 566 if not self.is_module_need_retry(task, module_name): 567 self._display_executing_process(None, test_driver, 568 test_drivers) 569 LOG.info("%s are passed, no need to retry" % module_name) 570 self._append_history_result(task, module_name) 571 LOG.info("") 572 test_drivers.pop(0) 573 continue 574 575 if getattr(task.config, ConfigConst.component_mapper, ""): 576 module_name = test_driver[1].source.module_name 577 self.component_task_setup(task, module_name) 578 579 # get environment 580 try: 581 environment = self.__allocate_environment__( 582 task.config.__dict__, test_driver) 583 except DeviceError as exception: 584 self._handle_device_error(exception, task, test_drivers) 585 Scheduler.call_life_stage_action(stage=LifeStage.case_end, 586 case_name=test_driver[1].source.module_name, 587 case_result="Failed", 588 error_msg=exception.args) 589 continue 590 591 if not self.queue_monitor_thread.is_alive(): 592 LOG.debug("Restart queue monitor thread.") 593 current_driver_threads = {} 594 message_queue = queue.Queue() 595 self.queue_monitor_thread = self._start_queue_monitor( 596 message_queue, test_drivers, current_driver_threads) 597 continue 598 599 if not Scheduler.is_execute: 600 if environment: 601 Scheduler.__free_environment__(environment) 602 continue 603 604 if check_mode(ModeType.decc) or getattr( 605 task.config, ConfigConst.check_device, False): 606 LOG.info("Start to check environment: %s" % 607 environment.__get_serial__()) 608 status = self._decc_task_setup(environment, task) 609 if not status: 610 Scheduler.__free_environment__(environment) 611 task_unused_env.append(environment) 612 error_message = "Load Error[00116]" 613 self.report_not_executed(task.config.report_path, 614 [test_drivers[0]], 615 error_message, task) 616 test_drivers.pop(0) 617 continue 618 else: 619 LOG.info("Environment %s check success", 620 environment.__get_serial__()) 621 622 # display executing progress 623 self._display_executing_process(environment, test_driver, 624 test_drivers) 625 626 # add to used devices and set need_kit_setup attribute 627 self._append_used_devices(environment, used_devices) 628 629 # start driver thread 630 self._start_driver_thread(current_driver_threads, ( 631 environment, message_queue, task, test_driver)) 632 test_drivers.pop(0) 633 634 # wait for all drivers threads finished and do kit teardown 635 while True: 636 if not self.queue_monitor_thread.is_alive(): 637 break 638 time.sleep(3) 639 640 self._do_taskkit_teardown(used_devices, task_unused_env) 641 642 @classmethod 643 def _append_history_result(cls, task, module_name): 644 history_report_path = getattr( 645 task.config, ConfigConst.history_report_path, "") 646 from _core.report.result_reporter import ResultReporter 647 params = ResultReporter.get_task_info_params( 648 history_report_path) 649 650 if not params or not params[ReportConst.data_reports]: 651 LOG.debug("Task info record data reports is empty") 652 return 653 654 report_data_dict = dict(params[ReportConst.data_reports]) 655 if module_name not in report_data_dict.keys(): 656 module_name_ = str(module_name).split(".")[0] 657 if module_name_ not in report_data_dict.keys(): 658 LOG.error("%s not in data reports" % module_name) 659 return 660 module_name = module_name_ 661 662 from xdevice import SuiteReporter 663 if check_mode(ModeType.decc): 664 virtual_report_path, report_result = SuiteReporter. \ 665 get_history_result_by_module(module_name) 666 LOG.debug("Append history result: (%s, %s)" % ( 667 virtual_report_path, report_result)) 668 SuiteReporter.append_report_result( 669 (virtual_report_path, report_result)) 670 else: 671 history_execute_result = report_data_dict.get(module_name, "") 672 LOG.info("Start copy %s" % history_execute_result) 673 file_name = get_filename_extension(history_execute_result)[0] 674 if os.path.exists(history_execute_result): 675 result_dir = \ 676 os.path.join(task.config.report_path, "result") 677 os.makedirs(result_dir, exist_ok=True) 678 target_execute_result = "%s.xml" % os.path.join( 679 task.config.report_path, "result", file_name) 680 shutil.copyfile(history_execute_result, target_execute_result) 681 LOG.info("Copy %s to %s" % ( 682 history_execute_result, target_execute_result)) 683 else: 684 error_msg = "Copy failed! %s not exists!" % \ 685 history_execute_result 686 raise ParamError(error_msg) 687 688 def _handle_device_error(self, exception, task, test_drivers): 689 self._display_executing_process(None, test_drivers[0], test_drivers) 690 error_message = "%s: %s" % \ 691 (get_instance_name(exception), exception) 692 LOG.exception(error_message, exc_info=False, 693 error_no=exception.error_no) 694 if check_mode(ModeType.decc): 695 error_message = "Load Error[00104]" 696 self.report_not_executed(task.config.report_path, [test_drivers[0]], 697 error_message, task) 698 699 LOG.info("") 700 test_drivers.pop(0) 701 702 @classmethod 703 def _clear_not_executed(cls, task, test_drivers): 704 if Scheduler.mode != ModeType.decc: 705 # clear all 706 test_drivers.clear() 707 return 708 # The result is reported only in DECC mode, and also clear all. 709 LOG.error("Case no run: task execution terminated!", error_no="00300") 710 error_message = "Execute Terminate[00300]" 711 cls.report_not_executed(task.config.report_path, test_drivers, 712 error_message) 713 test_drivers.clear() 714 715 @classmethod 716 def report_not_executed(cls, report_path, test_drivers, error_message, 717 task=None): 718 # traversing list to get remained elements 719 for test_driver in test_drivers: 720 # get report file 721 if task and getattr(task.config, "testdict", ""): 722 report_file = os.path.join(get_sub_path( 723 test_driver[1].source.source_file), 724 "%s.xml" % test_driver[1].source.test_name) 725 else: 726 report_file = os.path.join( 727 report_path, "result", 728 "%s.xml" % test_driver[1].source.module_name) 729 730 # get report name 731 report_name = test_driver[1].source.test_name if \ 732 not test_driver[1].source.test_name.startswith("{") \ 733 else "report" 734 735 # get module name 736 module_name = test_driver[1].source.module_name 737 738 # here, normally create empty report and then upload result 739 check_result_report(report_path, report_file, error_message, 740 report_name, module_name) 741 742 def _start_driver_thread(self, current_driver_threads, thread_params): 743 environment, message_queue, task, test_driver = thread_params 744 thread_id = self._get_thread_id(current_driver_threads) 745 driver_thread = DriversThread(test_driver, task, environment, 746 message_queue) 747 driver_thread.setDaemon(True) 748 driver_thread.set_thread_id(thread_id) 749 driver_thread.set_listeners(self._create_listeners(task)) 750 driver_thread.start() 751 current_driver_threads.setdefault(thread_id, driver_thread) 752 753 @classmethod 754 def _do_taskkit_teardown(cls, used_devices, task_unused_env): 755 for device in used_devices.values(): 756 if getattr(device, ConfigConst.need_kit_setup, True): 757 continue 758 759 for kit in getattr(device, ConfigConst.task_kits, []): 760 try: 761 kit.__teardown__(device) 762 except Exception as error: 763 LOG.debug("Do task kit teardown: %s" % error) 764 setattr(device, ConfigConst.task_kits, []) 765 setattr(device, ConfigConst.need_kit_setup, True) 766 767 for environment in task_unused_env: 768 for device in environment.devices: 769 setattr(device, ConfigConst.task_state, True) 770 setattr(device, ConfigConst.need_kit_setup, True) 771 772 def _display_executing_process(self, environment, test_driver, 773 test_drivers): 774 source_content = test_driver[1].source.source_file or \ 775 test_driver[1].source.source_string 776 if environment is None: 777 LOG.info("[%d / %d] Executing: %s, Driver: %s" % 778 (self.test_number - len(test_drivers) + 1, 779 self.test_number, source_content, 780 test_driver[1].source.test_type)) 781 return 782 783 LOG.info("[%d / %d] Executing: %s, Device: %s, Driver: %s" % 784 (self.test_number - len(test_drivers) + 1, 785 self.test_number, source_content, 786 environment.__get_serial__(), 787 test_driver[1].source.test_type)) 788 789 @classmethod 790 def _get_thread_id(cls, current_driver_threads): 791 thread_id = get_cst_time().strftime('%Y-%m-%d-%H-%M-%S-%f') 792 while thread_id in current_driver_threads.keys(): 793 thread_id = get_cst_time().strftime('%Y-%m-%d-%H-%M-%S-%f') 794 return thread_id 795 796 @classmethod 797 def _append_used_devices(cls, environment, used_devices): 798 if environment is not None: 799 for device in environment.devices: 800 device_serial = device.__get_serial__() if device else "None" 801 if device_serial and device_serial not in used_devices.keys(): 802 used_devices[device_serial] = device 803 804 @staticmethod 805 def _start_queue_monitor(message_queue, test_drivers, 806 current_driver_threads): 807 queue_monitor_thread = QueueMonitorThread(message_queue, 808 current_driver_threads, 809 test_drivers) 810 queue_monitor_thread.setDaemon(True) 811 queue_monitor_thread.start() 812 return queue_monitor_thread 813 814 def exec_command(self, command, options): 815 """ 816 Directly executes a command without adding it to the command queue. 817 """ 818 if command != "run": 819 raise ParamError("unsupported command action: %s" % command, 820 error_no="00100") 821 exec_type = options.exectype 822 if exec_type in [TestExecType.device_test, TestExecType.host_test, 823 TestExecType.host_driven_test]: 824 self._exec_task(options) 825 else: 826 LOG.error("Unsupported execution type '%s'" % exec_type, 827 error_no="00100") 828 829 return 830 831 def _exec_task(self, options): 832 """ 833 Directly allocates a device and execute a device test. 834 """ 835 try: 836 self.check_auto_retry(options) 837 task = self.__discover__(options.__dict__) 838 self.__execute__(task) 839 except (ParamError, ValueError, TypeError, SyntaxError, 840 AttributeError) as exception: 841 error_no = getattr(exception, "error_no", "00000") 842 LOG.exception("%s: %s" % (get_instance_name(exception), exception), 843 exc_info=False, error_no=error_no) 844 if Scheduler.upload_address: 845 Scheduler.upload_unavailable_result(str(exception.args)) 846 Scheduler.upload_report_end() 847 finally: 848 self.stop_task_logcat() 849 self.stop_encrypt_log() 850 self.start_auto_retry() 851 852 @classmethod 853 def _reset_environment(cls, environment="", config_file=""): 854 env_manager = EnvironmentManager() 855 env_manager.env_stop() 856 EnvironmentManager(environment, config_file) 857 858 @classmethod 859 def _restore_environment(cls): 860 env_manager = EnvironmentManager() 861 env_manager.env_stop() 862 EnvironmentManager() 863 864 @classmethod 865 def start_task_log(cls, log_path): 866 tool_file_name = "task_log.log" 867 tool_log_file = os.path.join(log_path, tool_file_name) 868 add_task_file_handler(tool_log_file) 869 870 @classmethod 871 def start_encrypt_log(cls, log_path): 872 from _core.report.encrypt import check_pub_key_exist 873 if check_pub_key_exist(): 874 encrypt_file_name = "task_log.ept" 875 encrypt_log_file = os.path.join(log_path, encrypt_file_name) 876 add_encrypt_file_handler(encrypt_log_file) 877 878 @classmethod 879 def stop_task_logcat(cls): 880 remove_task_file_handler() 881 882 @classmethod 883 def stop_encrypt_log(cls): 884 remove_encrypt_file_handler() 885 886 @staticmethod 887 def _find_test_root_descriptor(config): 888 if getattr(config, ConfigConst.task, None) or \ 889 getattr(config, ConfigConst.testargs, None): 890 Scheduler._pre_component_test(config) 891 892 if getattr(config, ConfigConst.subsystems, "") or \ 893 getattr(config, ConfigConst.parts, "") or \ 894 getattr(config, ConfigConst.component_base_kit, ""): 895 uid = unique_id("Scheduler", "component") 896 if config.subsystems or config.parts: 897 test_set = (config.subsystems, config.parts) 898 else: 899 kit = getattr(config, ConfigConst.component_base_kit) 900 test_set = kit.get_white_list() 901 902 root = Descriptor(uuid=uid, name="component", 903 source=TestSetSource(test_set), 904 con=True) 905 906 root.children = find_test_descriptors(config) 907 return root 908 # read test list from testdict 909 if getattr(config, ConfigConst.testdict, "") != "" and getattr( 910 config, ConfigConst.testfile, "") == "": 911 uid = unique_id("Scheduler", "testdict") 912 root = Descriptor(uuid=uid, name="testdict", 913 source=TestSetSource(config.testdict), 914 con=True) 915 root.children = find_testdict_descriptors(config) 916 return root 917 918 # read test list from testfile, testlist or task 919 test_set = getattr(config, ConfigConst.testfile, "") or getattr( 920 config, ConfigConst.testlist, "") or getattr( 921 config, ConfigConst.task, "") or getattr( 922 config, ConfigConst.testcase) 923 if test_set: 924 fname, _ = get_filename_extension(test_set) 925 uid = unique_id("Scheduler", fname) 926 root = Descriptor(uuid=uid, name=fname, 927 source=TestSetSource(test_set), con=True) 928 root.children = find_test_descriptors(config) 929 return root 930 else: 931 raise ParamError("no test file, list, dict, case or task found", 932 error_no="00102") 933 934 @classmethod 935 def terminate_cmd_exec(cls): 936 Scheduler.is_execute = False 937 Scheduler.repeat_index = 0 938 Scheduler.auto_retry = -1 939 LOG.info("Start to terminate execution") 940 return Scheduler.terminate_result.get() 941 942 @classmethod 943 def upload_case_result(cls, upload_param): 944 if not Scheduler.upload_address: 945 return 946 case_id, result, error, start_time, end_time, report_path = \ 947 upload_param 948 if error and len(error) > MAX_VISIBLE_LENGTH: 949 error = "%s..." % error[:MAX_VISIBLE_LENGTH] 950 LOG.info( 951 "Get upload params: %s, %s, %s, %s, %s, %s" % ( 952 case_id, result, error, start_time, end_time, report_path)) 953 if Scheduler.proxy is not None: 954 Scheduler.proxy.upload_result(case_id, result, error, start_time, 955 end_time, report_path) 956 else: 957 LOG.debug("There is no proxy, can't upload case result") 958 959 @classmethod 960 def upload_module_result(cls, exec_message): 961 if not Scheduler.is_execute: 962 return 963 result_file = exec_message.get_result() 964 request = exec_message.get_request() 965 test_name = request.root.source.test_name 966 if not result_file or not os.path.exists(result_file): 967 LOG.error("%s result not exists", test_name, error_no="00200") 968 return 969 970 test_type = request.root.source.test_type 971 LOG.info("Need upload result: %s, test type: %s" % 972 (result_file, test_type)) 973 upload_params, _, _ = cls._get_upload_params(result_file, request) 974 if not upload_params: 975 LOG.error("%s no test case result to upload" % result_file, 976 error_no="00201") 977 return 978 LOG.info("Need upload %s case" % len(upload_params)) 979 upload_suite = [] 980 for upload_param in upload_params: 981 case_id, result, error, start_time, end_time, report_path = \ 982 upload_param 983 case = {"caseid": case_id, "result": result, "error": error, 984 "start": start_time, "end": end_time, 985 "report": report_path} 986 LOG.info("Case info: %s", case) 987 upload_suite.append(case) 988 if Scheduler.proxy is not None: 989 Scheduler.proxy.upload_batch(upload_suite) 990 else: 991 LOG.debug("There is no proxy, can't upload module result") 992 993 @classmethod 994 def _get_upload_params(cls, result_file, request): 995 upload_params = [] 996 report_path = result_file 997 testsuites_element = DataHelper.parse_data_report(report_path) 998 start_time, end_time = cls._get_time(testsuites_element) 999 test_type = request.get_test_type() 1000 if test_type == HostDrivenTestType.device_test or test_type == HostDrivenTestType.windows_test: 1001 for model_element in testsuites_element: 1002 case_id = model_element.get(ReportConstant.name, "") 1003 case_result, error = cls.get_script_result(model_element) 1004 if error and len(error) > MAX_VISIBLE_LENGTH: 1005 error = "{}...".format(error[:MAX_VISIBLE_LENGTH]) 1006 report = cls._get_report_path( 1007 request.config.report_path, 1008 model_element.get(ReportConstant.report, "")) 1009 upload_params.append( 1010 (case_id, case_result, error, start_time, end_time, report,)) 1011 else: 1012 for testsuite_element in testsuites_element: 1013 if check_mode(ModeType.developer): 1014 module_name = str(get_filename_extension( 1015 report_path)[0]).split(".")[0] 1016 else: 1017 module_name = testsuite_element.get(ReportConstant.name, 1018 "none") 1019 for case_element in testsuite_element: 1020 case_id = cls._get_case_id(case_element, module_name) 1021 case_result, error = cls._get_case_result(case_element) 1022 if case_result == "Ignored": 1023 LOG.info( 1024 "Get upload params: {} result is ignored".format(case_id)) 1025 continue 1026 if error and len(error) > MAX_VISIBLE_LENGTH: 1027 error = "{}...".format(error[:MAX_VISIBLE_LENGTH]) 1028 report = cls._get_report_path( 1029 request.config.report_path, 1030 case_element.get(ReportConstant.report, "")) 1031 upload_params.append( 1032 (case_id, case_result, error, start_time, end_time, report,)) 1033 return upload_params, start_time, end_time 1034 1035 @classmethod 1036 def get_script_result(cls, model_element): 1037 disabled = int(model_element.get(ReportConstant.disabled)) if \ 1038 model_element.get(ReportConstant.disabled, "") else 0 1039 failures = int(model_element.get(ReportConstant.failures)) if \ 1040 model_element.get(ReportConstant.failures, "") else 0 1041 errors = int(model_element.get(ReportConstant.errors)) if \ 1042 model_element.get(ReportConstant.errors, "") else 0 1043 unavailable = int(model_element.get(ReportConstant.unavailable)) if \ 1044 model_element.get(ReportConstant.unavailable, "") else 0 1045 if failures > 0 or errors > 0: 1046 result = "Failed" 1047 elif disabled > 0 or unavailable > 0: 1048 result = "Unavailable" 1049 else: 1050 result = "Passed" 1051 1052 if result == "Passed": 1053 return result, "" 1054 if Scheduler.mode == ModeType.decc: 1055 result = "Failed" 1056 result_kind = model_element.get(ReportConstant.result_kind, "") 1057 if result_kind: 1058 result = result_kind 1059 1060 error_msg = model_element.get(ReportConstant.message, "") 1061 if not error_msg and len(model_element) > 0: 1062 error_msg = model_element[0].get(ReportConstant.message, "") 1063 if not error_msg and len(model_element[0]) > 0: 1064 error_msg = model_element[0][0].get(ReportConstant.message, "") 1065 return result, error_msg 1066 1067 @classmethod 1068 def _get_case_id(cls, case_element, package_name): 1069 class_name = case_element.get(ReportConstant.class_name, "none") 1070 method_name = case_element.get(ReportConstant.name, "none") 1071 case_id = "{}#{}#{}#{}".format(Scheduler.task_name, package_name, 1072 class_name, method_name) 1073 return case_id 1074 1075 @classmethod 1076 def _get_case_result(cls, case_element): 1077 # get result 1078 case = Case() 1079 case.status = case_element.get(ReportConstant.status, "") 1080 case.result = case_element.get(ReportConstant.result, "") 1081 if case_element.get(ReportConstant.message, ""): 1082 case.message = case_element.get(ReportConstant.message) 1083 if len(case_element) > 0: 1084 if not case.result: 1085 case.result = ReportConstant.false 1086 case.message = case_element[0].get(ReportConstant.message) 1087 if case.is_passed(): 1088 result = "Passed" 1089 elif case.is_failed(): 1090 result = "Failed" 1091 elif case.is_blocked(): 1092 result = "Blocked" 1093 elif case.is_ignored(): 1094 result = "Ignored" 1095 elif case.is_completed(): 1096 if case.message: 1097 result = "Failed" 1098 else: 1099 result = "Passed" 1100 else: 1101 result = "Unavailable" 1102 return result, case.message 1103 1104 @classmethod 1105 def _get_time(cls, testsuite_element): 1106 start_time = testsuite_element.get(ReportConstant.start_time, "") 1107 end_time = testsuite_element.get(ReportConstant.end_time, "") 1108 try: 1109 if start_time and end_time: 1110 start_time = int(time.mktime(time.strptime( 1111 start_time, ReportConstant.time_format)) * 1000) 1112 end_time = int(time.mktime(time.strptime( 1113 end_time, ReportConstant.time_format)) * 1000) 1114 else: 1115 timestamp = str(testsuite_element.get( 1116 ReportConstant.time_stamp, "")).replace("T", " ") 1117 cost_time = testsuite_element.get(ReportConstant.time, "") 1118 if timestamp and cost_time: 1119 try: 1120 end_time = int(time.mktime(time.strptime( 1121 timestamp, ReportConstant.time_format)) * 1000) 1122 except ArithmeticError as error: 1123 LOG.error("Get time error {}".format(error)) 1124 end_time = int(time.time() * 1000) 1125 except ValueError as error: 1126 LOG.error("Get time error {}".format(error)) 1127 end_time = int(time.mktime(time.strptime( 1128 timestamp.split(".")[0], ReportConstant.time_format)) * 1000) 1129 start_time = int(end_time - float(cost_time) * 1000) 1130 else: 1131 current_time = int(time.time() * 1000) 1132 start_time, end_time = current_time, current_time 1133 except ArithmeticError as error: 1134 LOG.error("Get time error {}".format(error)) 1135 current_time = int(time.time() * 1000) 1136 start_time, end_time = current_time, current_time 1137 return start_time, end_time 1138 1139 @classmethod 1140 def _get_report_path(cls, base_path, report=""): 1141 """ get report path 1142 base_path: str, report base path 1143 report : str, report relative path 1144 """ 1145 report_path = os.path.join(base_path, report) 1146 return report_path if report and os.path.exists(report_path) else base_path 1147 1148 @classmethod 1149 def upload_task_result(cls, task, error_message=""): 1150 if not Scheduler.task_name: 1151 LOG.info("No need upload summary report") 1152 return 1153 1154 summary_data_report = os.path.join(task.config.report_path, 1155 ReportConstant.summary_data_report) 1156 if not os.path.exists(summary_data_report): 1157 Scheduler.upload_unavailable_result(str( 1158 error_message) or "summary report not exists", 1159 task.config.report_path) 1160 return 1161 1162 task_element = ElementTree.parse(summary_data_report).getroot() 1163 start_time, end_time = cls._get_time(task_element) 1164 task_result = cls._get_task_result(task_element) 1165 error_msg = "" 1166 for child in task_element: 1167 if child.get(ReportConstant.message, ""): 1168 error_msg = "{}{}".format( 1169 error_msg, "%s;" % child.get(ReportConstant.message)) 1170 if error_msg: 1171 error_msg = error_msg[:-1] 1172 report = cls._get_report_path( 1173 task.config.report_path, ReportConstant.summary_vision_report) 1174 cls.upload_case_result( 1175 (Scheduler.task_name, task_result, error_msg, start_time, end_time, report)) 1176 1177 @classmethod 1178 def _get_task_result(cls, task_element): 1179 failures = int(task_element.get(ReportConstant.failures, 0)) 1180 errors = int(task_element.get(ReportConstant.errors, 0)) 1181 disabled = int(task_element.get(ReportConstant.disabled, 0)) 1182 unavailable = int(task_element.get(ReportConstant.unavailable, 0)) 1183 if disabled > 0: 1184 task_result = "Blocked" 1185 elif errors > 0 or failures > 0: 1186 task_result = "Failed" 1187 elif unavailable > 0: 1188 task_result = "Unavailable" 1189 else: 1190 task_result = "Passed" 1191 return task_result 1192 1193 @classmethod 1194 def upload_unavailable_result(cls, error_msg, report_path=""): 1195 start_time = int(time.time() * 1000) 1196 Scheduler.upload_case_result((Scheduler.task_name, "Unavailable", 1197 error_msg, start_time, start_time, 1198 report_path)) 1199 1200 @classmethod 1201 def upload_report_end(cls): 1202 if getattr(cls, "tmp_json", None): 1203 os.remove(cls.tmp_json) 1204 del cls.tmp_json 1205 LOG.info("Upload report end") 1206 if Scheduler.proxy is not None: 1207 Scheduler.proxy.report_end() 1208 else: 1209 LOG.debug("There is no proxy, can't upload report end") 1210 1211 @classmethod 1212 def is_module_need_retry(cls, task, module_name): 1213 failed_flag = False 1214 if check_mode(ModeType.decc): 1215 from xdevice import SuiteReporter 1216 for module, _ in SuiteReporter.get_failed_case_list(): 1217 if module_name == module or str(module_name).split( 1218 ".")[0] == module: 1219 failed_flag = True 1220 break 1221 else: 1222 from xdevice import ResultReporter 1223 history_report_path = \ 1224 getattr(task.config, ConfigConst.history_report_path, "") 1225 params = ResultReporter.get_task_info_params(history_report_path) 1226 if params and params[ReportConst.unsuccessful_params]: 1227 if dict(params[ReportConst.unsuccessful_params]).get( 1228 module_name, []): 1229 failed_flag = True 1230 elif dict(params[ReportConst.unsuccessful_params]).get( 1231 str(module_name).split(".")[0], []): 1232 failed_flag = True 1233 return failed_flag 1234 1235 @classmethod 1236 def compare_spt_time(cls, kit_spt, device_spt): 1237 if not kit_spt or not device_spt: 1238 return False 1239 try: 1240 kit_time = str(kit_spt).split("-")[:2] 1241 device_time = str(device_spt).split("-")[:2] 1242 k_spt = datetime.datetime.strptime( 1243 "-".join(kit_time), "%Y-%m") 1244 d_spt = datetime.datetime.strptime("-".join(device_time), "%Y-%m") 1245 except ValueError as value_error: 1246 LOG.debug("Date format is error, %s" % value_error.args) 1247 return False 1248 month_interval = int(k_spt.month) - int(d_spt.month) 1249 year_interval = int(k_spt.year) - int(d_spt.year) 1250 LOG.debug("Kit spt (year=%s, month=%s), device spt (year=%s, month=%s)" 1251 % (k_spt.year, k_spt.month, d_spt.year, d_spt.month)) 1252 if year_interval < 0: 1253 return True 1254 if year_interval == 0 and month_interval in range(-11, 3): 1255 return True 1256 if year_interval == 1 and month_interval + 12 in (1, 2): 1257 return True 1258 return False 1259 1260 @classmethod 1261 def _parse_property_value(cls, property_name, driver_request, kit): 1262 test_args = copy.deepcopy( 1263 driver_request.config.get(ConfigConst.testargs, dict())) 1264 property_value = "" 1265 if ConfigConst.pass_through in test_args.keys(): 1266 pt_dict = json.loads(test_args.get(ConfigConst.pass_through, "")) 1267 property_value = pt_dict.get(property_name, None) 1268 elif property_name in test_args.keys: 1269 property_value = test_args.get(property_name, None) 1270 return property_value if property_value else \ 1271 kit.properties.get(property_name, None) 1272 1273 @classmethod 1274 def _calculate_device_options(cls, device_options, environment_config, 1275 options, test_source): 1276 # calculate difference 1277 diff_value = len(environment_config) - len(device_options) 1278 if device_options and diff_value == 0: 1279 return device_options 1280 1281 else: 1282 diff_value = diff_value if diff_value else 1 1283 if str(test_source.source_file).endswith(".bin"): 1284 device_option = DeviceSelectionOption( 1285 options, DeviceLabelType.ipcamera, test_source) 1286 else: 1287 device_option = DeviceSelectionOption( 1288 options, None, test_source) 1289 1290 device_option.source_file = \ 1291 test_source.source_file or test_source.source_string 1292 device_option.required_manager = "device" 1293 device_options.extend([device_option] * diff_value) 1294 LOG.debug("Assign device options and it's length is %s" 1295 % len(device_options)) 1296 return device_options 1297 1298 @classmethod 1299 def update_test_type_in_source(cls, key, value): 1300 LOG.debug("update test type dict in source") 1301 TestDictSource.test_type[key] = value 1302 1303 @classmethod 1304 def update_ext_type_in_source(cls, key, value): 1305 LOG.debug("update ext type dict in source") 1306 TestDictSource.exe_type[key] = value 1307 1308 @classmethod 1309 def clear_test_dict_source(cls): 1310 TestDictSource.clear() 1311 1312 @classmethod 1313 def reset_test_dict_source(cls): 1314 TestDictSource.reset() 1315 1316 @classmethod 1317 def _pre_component_test(cls, config): 1318 if not config.kits: 1319 return 1320 cur_kit = None 1321 for kit in config.kits: 1322 if kit.__class__.__name__ == CKit.component: 1323 cur_kit = kit 1324 break 1325 if not cur_kit: 1326 return 1327 get_white_list = getattr(cur_kit, "get_white_list", None) 1328 if not callable(get_white_list): 1329 return 1330 subsystems, parts = get_white_list() 1331 if not subsystems and not parts: 1332 return 1333 setattr(config, ConfigConst.component_base_kit, cur_kit) 1334 1335 @classmethod 1336 def component_task_setup(cls, task, module_name): 1337 component_kit = task.config.get(ConfigConst.component_base_kit, None) 1338 if not component_kit: 1339 # only -p -s .you do not care about the components that can be 1340 # supported. you only want to run the use cases of the current 1341 # component 1342 return 1343 LOG.debug("Start component task setup") 1344 _component_mapper = task.config.get(ConfigConst.component_mapper) 1345 _subsystem, _part = _component_mapper.get(module_name) 1346 1347 is_hit = False 1348 # find in cache. if not find, update cache 1349 cache_subsystem, cache_part = component_kit.get_cache() 1350 if _subsystem in cache_subsystem or _part in cache_subsystem: 1351 is_hit = True 1352 if not is_hit: 1353 env_manager = EnvironmentManager() 1354 for _, manager in env_manager.managers.items(): 1355 if getattr(manager, "devices_list", []): 1356 for device in manager.devices_list: 1357 component_kit.__setup__(device) 1358 cache_subsystem, cache_part = component_kit.get_cache() 1359 if _subsystem in cache_subsystem or _part in cache_subsystem: 1360 is_hit = True 1361 if not is_hit: 1362 LOG.warning("%s are skipped, no suitable component found. " 1363 "Require subsystem=%s part=%s, no device match this" 1364 % (module_name, _subsystem, _part)) 1365 1366 @classmethod 1367 def construct_repeat_list(cls, task, index): 1368 repeat_list = list() 1369 for driver_index in range(len(task.test_drivers)): 1370 cur_test_driver = copy.deepcopy(task.test_drivers[driver_index]) 1371 desc = cur_test_driver[1] 1372 desc.unique_id = '{}_{}'.format(desc.unique_id, index + 1) 1373 repeat_list.append(cur_test_driver) 1374 return repeat_list 1375 1376 @classmethod 1377 def start_auto_retry(cls): 1378 if not Scheduler.is_need_auto_retry: 1379 Scheduler.auto_retry = -1 1380 LOG.debug("No need auto retry") 1381 return 1382 if Scheduler.auto_retry > 0: 1383 Scheduler.auto_retry -= 1 1384 if Scheduler.auto_retry == 0: 1385 Scheduler.auto_retry = -1 1386 from _core.command.console import Console 1387 console = Console() 1388 console.command_parser("run --retry") 1389 1390 @classmethod 1391 def check_auto_retry(cls, options): 1392 if Scheduler.auto_retry < 0 and \ 1393 int(getattr(options, ConfigConst.auto_retry, 0)) > 0: 1394 value = int(getattr(options, ConfigConst.auto_retry, 0)) 1395 Scheduler.auto_retry = value if value <= 10 else 10 1396 1397 @staticmethod 1398 def call_life_stage_action(**kwargs): 1399 """ 1400 call in different lift stage 1401 """ 1402 from xdevice import Task 1403 from xdevice import Variables 1404 if Task.life_stage_listener is None: 1405 return 1406 stage = kwargs.get("stage", None) 1407 data = dict() 1408 if stage == LifeStage.task_start: 1409 data = {"type": stage, "name": Variables.task_name} 1410 elif stage == LifeStage.task_end: 1411 task = kwargs.get("task", None) 1412 error = kwargs.get("error", "") 1413 unavailable = kwargs.get("unavailable", 0) 1414 summary_data_report = os.path.join(task.config.report_path, 1415 ReportConstant.summary_data_report) if task else "" 1416 if not os.path.exists(summary_data_report): 1417 LOG.error("Call lifecycle error, summary report {} not exists".format(task.config.report_path)) 1418 passed = failures = blocked = 0 1419 else: 1420 task_element = ElementTree.parse(summary_data_report).getroot() 1421 total_tests = int(task_element.get(ReportConstant.tests, 0)) 1422 failures = int(task_element.get(ReportConstant.failures, 0)) 1423 blocked = int(task_element.get(ReportConstant.disabled, 0)) 1424 ignored = int(task_element.get(ReportConstant.ignored, 0)) 1425 unavailable = int(task_element.get(ReportConstant.unavailable, 0)) 1426 passed = total_tests - failures - blocked - ignored 1427 data = {"type": stage, "name": Variables.task_name, 1428 "passed": passed, "failures": failures, 1429 "blocked": blocked, "unavailable": unavailable, 1430 "error": error} 1431 elif stage == LifeStage.case_start: 1432 case_name = kwargs.get("case_name", "") 1433 data = {"type": stage, "name": case_name} 1434 elif stage == LifeStage.case_end: 1435 case_name = kwargs.get("case_name", "") 1436 case_result = kwargs.get("case_result", "") 1437 error_msg = kwargs.get("error_msg", "") 1438 data = {"type": stage, "name": case_name, "case_result": case_result, "error_msg": error_msg} 1439 else: 1440 LOG.error("Call lifecycle error, error param stage: {}".format(stage)) 1441 return 1442 Task.life_stage_listener(data) 1443