1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import copy 20import datetime 21import os 22import queue 23import time 24import uuid 25import shutil 26import json 27from xml.etree import ElementTree 28 29from _core.utils import unique_id 30from _core.utils import check_mode 31from _core.utils import get_sub_path 32from _core.utils import get_filename_extension 33from _core.utils import convert_serial 34from _core.utils import convert_mac 35from _core.utils import get_instance_name 36from _core.utils import is_config_str 37from _core.utils import check_result_report 38from _core.utils import get_cst_time 39from _core.environment.manager_env import EnvironmentManager 40from _core.environment.manager_env import DeviceSelectionOption 41from _core.exception import ParamError 42from _core.exception import ExecuteTerminate 43from _core.exception import LiteDeviceError 44from _core.exception import DeviceError 45from _core.interface import LifeCycle 46from _core.executor.request import Request 47from _core.executor.request import Descriptor 48from _core.plugin import get_plugin 49from _core.plugin import Plugin 50from _core.plugin import Config 51from _core.report.reporter_helper import ExecInfo 52from _core.report.reporter_helper import ReportConstant 53from _core.report.reporter_helper import Case 54from _core.report.reporter_helper import DataHelper 55from _core.constants import TestExecType 56from _core.constants import CKit 57from _core.constants import ModeType 58from _core.constants import DeviceLabelType 59from _core.constants import SchedulerType 60from _core.constants import ListenerType 61from _core.constants import ConfigConst 62from _core.constants import ReportConst 63from _core.constants import HostDrivenTestType 64from _core.constants import LifeStage 65from _core.executor.concurrent import DriversThread 66from _core.executor.concurrent import QueueMonitorThread 67from _core.executor.concurrent import DriversDryRunThread 68from _core.executor.source import TestSetSource 69from _core.executor.source import find_test_descriptors 70from _core.executor.source import find_testdict_descriptors 71from _core.executor.source import TestDictSource 72from _core.logger import platform_logger 73from _core.logger import add_task_file_handler 74from _core.logger import remove_task_file_handler 75from _core.logger import add_encrypt_file_handler 76from _core.logger import remove_encrypt_file_handler 77 78__all__ = ["Scheduler"] 79LOG = platform_logger("Scheduler") 80 81MAX_VISIBLE_LENGTH = 150 82 83 84@Plugin(type=Plugin.SCHEDULER, id=SchedulerType.scheduler) 85class Scheduler(object): 86 """ 87 The Scheduler is the main entry point for client code that wishes to 88 discover and execute tests. 89 """ 90 # factory params 91 is_execute = True 92 terminate_result = queue.Queue() 93 upload_address = "" 94 task_type = "" 95 task_name = "" 96 mode = "" 97 proxy = None 98 99 # command_queue to store test commands 100 command_queue = [] 101 max_command_num = 50 102 # the number of tests in current task 103 test_number = 0 104 device_labels = [] 105 repeat_index = 0 106 auto_retry = -1 107 is_need_auto_retry = False 108 109 queue_monitor_thread = None 110 111 def __discover__(self, args): 112 """Discover task to execute""" 113 from _core.executor.request import Task 114 config = Config() 115 config.update(args) 116 task = Task(drivers=[]) 117 task.init(config) 118 119 Scheduler.call_life_stage_action(stage=LifeStage.task_start) 120 121 root_descriptor = self._find_test_root_descriptor(task.config) 122 task.set_root_descriptor(root_descriptor) 123 return task 124 125 def __execute__(self, task): 126 error_message = "" 127 unavailable = 0 128 try: 129 Scheduler.is_execute = True 130 if Scheduler.command_queue: 131 LOG.debug("Run command: {}".format(convert_mac(Scheduler.command_queue[-1]))) 132 run_command = Scheduler.command_queue.pop() 133 task_id = str(uuid.uuid1()).split("-")[0] 134 Scheduler.command_queue.append((task_id, run_command, 135 task.config.report_path)) 136 if len(Scheduler.command_queue) > self.max_command_num: 137 Scheduler.command_queue.pop(0) 138 139 if getattr(task.config, ConfigConst.test_environment, ""): 140 self._reset_environment(task.config.get( 141 ConfigConst.test_environment, "")) 142 elif getattr(task.config, ConfigConst.configfile, ""): 143 self._reset_environment(config_file=task.config.get( 144 ConfigConst.configfile, "")) 145 146 # do with the count of repeat about a task 147 if getattr(task.config, ConfigConst.repeat, 0) > 0: 148 Scheduler.repeat_index = \ 149 getattr(task.config, ConfigConst.repeat) 150 drivers_list = list() 151 for index in range(-1, task.config.repeat): 152 repeat_list = self.construct_repeat_list(task, index) 153 if repeat_list: 154 drivers_list.extend(repeat_list) 155 task.test_drivers = drivers_list 156 else: 157 Scheduler.repeat_index = 0 158 159 self.test_number = len(task.test_drivers) 160 for des in task.root.children: 161 if des.error: 162 error_message = "{};{}".format(des.error.error_msg, error_message) 163 unavailable += 1 164 if error_message != "": 165 error_message = "test source '{}' or its config json not exists".format(error_message) 166 LOG.error("Exec task error: {}".format(error_message)) 167 168 raise ParamError(error_message) 169 170 if task.config.exectype == TestExecType.device_test: 171 self._device_test_execute(task) 172 elif task.config.exectype == TestExecType.host_test: 173 self._host_test_execute(task) 174 else: 175 LOG.info("Exec type %s is bypassed" % task.config.exectype) 176 177 except (ParamError, ValueError, TypeError, SyntaxError, AttributeError, 178 DeviceError, LiteDeviceError, ExecuteTerminate) as exception: 179 error_no = getattr(exception, "error_no", "") 180 error_message = "%s[%s]" % (str(exception), error_no) \ 181 if error_no else str(exception) 182 error_no = error_no if error_no else "00000" 183 LOG.exception(exception, exc_info=False, error_no=error_no) 184 finally: 185 Scheduler.reset_test_dict_source() 186 if getattr(task.config, ConfigConst.test_environment, "") or \ 187 getattr(task.config, ConfigConst.configfile, ""): 188 self._restore_environment() 189 190 Scheduler.call_life_stage_action(stage=LifeStage.task_end, 191 task=task, error=error_message, 192 unavailable=unavailable) 193 if Scheduler.upload_address: 194 Scheduler.upload_task_result(task, error_message) 195 Scheduler.upload_report_end() 196 197 def _device_test_execute(self, task): 198 used_devices = {} 199 try: 200 self._dynamic_concurrent_execute(task, used_devices) 201 finally: 202 Scheduler.__reset_environment__(used_devices) 203 # generate reports 204 self._generate_task_report(task, used_devices) 205 206 def _host_test_execute(self, task): 207 """Execute host test""" 208 try: 209 # initial params 210 current_driver_threads = {} 211 test_drivers = task.test_drivers 212 message_queue = queue.Queue() 213 214 # execute test drivers 215 self.queue_monitor_thread = self._start_queue_monitor( 216 message_queue, test_drivers, current_driver_threads) 217 while test_drivers: 218 if len(current_driver_threads) > 5: 219 time.sleep(3) 220 continue 221 222 # clear remaining test drivers when scheduler is terminated 223 if not Scheduler.is_execute: 224 LOG.info("Clear test drivers") 225 self._clear_not_executed(task, test_drivers) 226 break 227 228 # get test driver and device 229 test_driver = test_drivers[0] 230 231 # display executing progress 232 self._display_executing_process(None, test_driver, 233 test_drivers) 234 235 # start driver thread 236 self._start_driver_thread(current_driver_threads, ( 237 None, message_queue, task, test_driver)) 238 test_drivers.pop(0) 239 240 # wait for all drivers threads finished and do kit teardown 241 while True: 242 if not self.queue_monitor_thread.is_alive(): 243 break 244 time.sleep(3) 245 246 finally: 247 # generate reports 248 self._generate_task_report(task) 249 250 def _dry_run_device_test_execute(self, task): 251 try: 252 # initial params 253 used_devices = {} 254 current_driver_threads = {} 255 test_drivers = task.test_drivers 256 message_queue = queue.Queue() 257 task_unused_env = [] 258 259 # execute test drivers 260 self.queue_monitor_thread = self._start_queue_monitor( 261 message_queue, test_drivers, current_driver_threads) 262 while test_drivers: 263 # clear remaining test drivers when scheduler is terminated 264 if not Scheduler.is_execute: 265 LOG.info("Clear test drivers") 266 self._clear_not_executed(task, test_drivers) 267 break 268 269 # get test driver and device 270 test_driver = test_drivers[0] 271 # get environment 272 try: 273 environment = self.__allocate_environment__( 274 task.config.__dict__, test_driver) 275 except DeviceError as exception: 276 self._handle_device_error(exception, task, test_drivers) 277 continue 278 279 if not Scheduler.is_execute: 280 if environment: 281 Scheduler.__free_environment__(environment) 282 continue 283 284 # start driver thread 285 thread_id = self._get_thread_id(current_driver_threads) 286 driver_thread = DriversDryRunThread(test_driver, task, 287 environment, 288 message_queue) 289 driver_thread.setDaemon(True) 290 driver_thread.set_thread_id(thread_id) 291 driver_thread.start() 292 current_driver_threads.setdefault(thread_id, driver_thread) 293 294 test_drivers.pop(0) 295 296 # wait for all drivers threads finished and do kit teardown 297 while True: 298 if not self.queue_monitor_thread.is_alive(): 299 break 300 time.sleep(3) 301 302 self._do_taskkit_teardown(used_devices, task_unused_env) 303 finally: 304 LOG.debug( 305 "Removing report_path: {}".format(task.config.report_path)) 306 # delete reports 307 self.stop_task_logcat() 308 self.stop_encrypt_log() 309 shutil.rmtree(task.config.report_path) 310 311 def _generate_task_report(self, task, used_devices=None): 312 task_info = ExecInfo() 313 test_type = getattr(task.config, "testtype", []) 314 task_name = getattr(task.config, "task", "") 315 if task_name: 316 task_info.test_type = str(task_name).upper() 317 else: 318 task_info.test_type = ",".join(test_type) if test_type else "Test" 319 if used_devices: 320 serials = [] 321 platforms = [] 322 test_labels = [] 323 for serial, device in used_devices.items(): 324 serials.append(convert_serial(serial)) 325 platform = str(device.label).capitalize() 326 test_label = str(device.label).capitalize() 327 if platform not in platforms: 328 platforms.append(platform) 329 if test_label not in test_labels: 330 test_labels.append(test_label) 331 task_info.device_name = ",".join(serials) 332 task_info.platform = ",".join(platforms) 333 task_info.device_label = ",".join(test_labels) 334 else: 335 task_info.device_name = "None" 336 task_info.platform = "None" 337 task_info.device_label = "None" 338 task_info.test_time = task.config.start_time 339 task_info.product_info = getattr(task, "product_info", "") 340 341 listeners = self._create_listeners(task) 342 for listener in listeners: 343 listener.__ended__(LifeCycle.TestTask, task_info, 344 test_type=task_info.test_type, task=task) 345 346 @classmethod 347 def _create_listeners(cls, task): 348 listeners = [] 349 # append log listeners 350 log_listeners = get_plugin(Plugin.LISTENER, ListenerType.log) 351 for log_listener in log_listeners: 352 log_listener_instance = log_listener.__class__() 353 listeners.append(log_listener_instance) 354 # append report listeners 355 report_listeners = get_plugin(Plugin.LISTENER, ListenerType.report) 356 for report_listener in report_listeners: 357 report_listener_instance = report_listener.__class__() 358 setattr(report_listener_instance, "report_path", 359 task.config.report_path) 360 listeners.append(report_listener_instance) 361 # append upload listeners 362 upload_listeners = get_plugin(Plugin.LISTENER, ListenerType.upload) 363 for upload_listener in upload_listeners: 364 upload_listener_instance = upload_listener.__class__() 365 listeners.append(upload_listener_instance) 366 return listeners 367 368 @staticmethod 369 def _find_device_options(environment_config, options, test_source): 370 devices_option = [] 371 index = 1 372 for device_dict in environment_config: 373 label = device_dict.get("label", "") 374 required_manager = device_dict.get("type", "device") 375 required_manager = \ 376 required_manager if required_manager else "device" 377 if not label: 378 continue 379 device_option = DeviceSelectionOption(options, label, test_source) 380 device_dict.pop("type", None) 381 device_dict.pop("label", None) 382 device_option.required_manager = required_manager 383 device_option.extend_value = device_dict 384 device_option.source_file = \ 385 test_source.config_file or test_source.source_string 386 if hasattr(device_option, "env_index"): 387 device_option.env_index = index 388 index += 1 389 devices_option.append(device_option) 390 return devices_option 391 392 def __allocate_environment__(self, options, test_driver): 393 device_options = self.get_device_options(options, 394 test_driver[1].source) 395 environment = None 396 env_manager = EnvironmentManager() 397 while True: 398 if not Scheduler.is_execute: 399 break 400 if self.queue_monitor_thread and \ 401 not self.queue_monitor_thread.is_alive(): 402 LOG.error("Queue monitor thread is dead.") 403 break 404 environment = env_manager.apply_environment(device_options) 405 if len(environment.devices) == len(device_options): 406 return environment 407 else: 408 env_manager.release_environment(environment) 409 LOG.debug("'%s' is waiting available device", 410 test_driver[1].source.test_name) 411 if env_manager.check_device_exist(device_options): 412 continue 413 else: 414 LOG.debug("'%s' required %s devices, actually %s devices" 415 " were found" % (test_driver[1].source.test_name, 416 len(device_options), 417 len(environment.devices))) 418 raise DeviceError("The '%s' required device does not exist" 419 % test_driver[1].source.source_file, 420 error_no="00104") 421 422 return environment 423 424 @classmethod 425 def get_device_options(cls, options, test_source): 426 device_options = [] 427 config_file = test_source.config_file 428 environment_config = [] 429 from _core.testkit.json_parser import JsonParser 430 if test_source.source_string and is_config_str( 431 test_source.source_string): 432 json_config = JsonParser(test_source.source_string) 433 environment_config = json_config.get_environment() 434 device_options = cls._find_device_options( 435 environment_config, options, test_source) 436 elif config_file and os.path.exists(config_file): 437 json_config = JsonParser(test_source.config_file) 438 environment_config = json_config.get_environment() 439 device_options = cls._find_device_options( 440 environment_config, options, test_source) 441 442 device_options = cls._calculate_device_options( 443 device_options, environment_config, options, test_source) 444 445 if ConfigConst.component_mapper in options.keys(): 446 required_component = options.get(ConfigConst.component_mapper). \ 447 get(test_source.module_name, None) 448 for device_option in device_options: 449 device_option.required_component = required_component 450 return device_options 451 452 @staticmethod 453 def __free_environment__(environment): 454 env_manager = EnvironmentManager() 455 env_manager.release_environment(environment) 456 457 @staticmethod 458 def __reset_environment__(used_devices): 459 env_manager = EnvironmentManager() 460 env_manager.reset_environment(used_devices) 461 462 @classmethod 463 def _check_device_spt(cls, kit, driver_request, device): 464 kit_spt = cls._parse_property_value(ConfigConst.spt, 465 driver_request, kit) 466 if not kit_spt: 467 setattr(device, ConfigConst.task_state, False) 468 LOG.error("Spt is empty", error_no="00108") 469 return 470 if getattr(driver_request, ConfigConst.product_info, ""): 471 product_info = getattr(driver_request, 472 ConfigConst.product_info) 473 if not isinstance(product_info, dict): 474 LOG.warning("Product info should be dict, %s", 475 product_info) 476 setattr(device, ConfigConst.task_state, False) 477 return 478 device_spt = product_info.get("Security Patch", None) 479 if not device_spt or not \ 480 Scheduler.compare_spt_time(kit_spt, device_spt): 481 LOG.error("The device %s spt is %s, " 482 "and the test case spt is %s, " 483 "which does not meet the requirements" % 484 (device.device_sn, device_spt, kit_spt), 485 error_no="00116") 486 setattr(device, ConfigConst.task_state, False) 487 return 488 489 def _decc_task_setup(self, environment, task): 490 config = Config() 491 config.update(task.config.__dict__) 492 config.environment = environment 493 driver_request = Request(config=config) 494 495 if environment is None: 496 return False 497 498 for device in environment.devices: 499 if not getattr(device, ConfigConst.need_kit_setup, True): 500 LOG.debug("Device %s need kit setup is false" % device) 501 continue 502 503 # do task setup for device 504 kits_copy = copy.deepcopy(task.config.kits) 505 setattr(device, ConfigConst.task_kits, kits_copy) 506 for kit in getattr(device, ConfigConst.task_kits, []): 507 if not Scheduler.is_execute: 508 break 509 try: 510 kit.__setup__(device, request=driver_request) 511 except (ParamError, ExecuteTerminate, DeviceError, 512 LiteDeviceError, ValueError, TypeError, 513 SyntaxError, AttributeError) as exception: 514 error_no = getattr(exception, "error_no", "00000") 515 LOG.exception( 516 "Task setup device: %s, exception: %s" % ( 517 environment.__get_serial__(), 518 exception), exc_info=False, error_no=error_no) 519 if kit.__class__.__name__ == CKit.query and \ 520 device.label in [DeviceLabelType.ipcamera]: 521 self._check_device_spt(kit, driver_request, device) 522 LOG.debug("Set device %s need kit setup to false" % device) 523 setattr(device, ConfigConst.need_kit_setup, False) 524 525 for device in environment.devices: 526 if not getattr(device, ConfigConst.task_state, True): 527 return False 528 529 # set product_info to self.task 530 if getattr(driver_request, ConfigConst.product_info, "") and \ 531 not getattr(task, ConfigConst.product_info, ""): 532 product_info = getattr(driver_request, ConfigConst.product_info) 533 if not isinstance(product_info, dict): 534 LOG.warning("Product info should be dict, %s", 535 product_info) 536 else: 537 setattr(task, ConfigConst.product_info, product_info) 538 return True 539 540 def _dynamic_concurrent_execute(self, task, used_devices): 541 # initial params 542 current_driver_threads = {} 543 test_drivers = task.test_drivers 544 message_queue = queue.Queue() 545 task_unused_env = [] 546 547 # execute test drivers 548 self.queue_monitor_thread = self._start_queue_monitor( 549 message_queue, test_drivers, current_driver_threads) 550 while test_drivers: 551 # clear remaining test drivers when scheduler is terminated 552 if not Scheduler.is_execute: 553 LOG.info("Clear test drivers") 554 self._clear_not_executed(task, test_drivers) 555 break 556 557 # get test driver and device 558 test_driver = test_drivers[0] 559 560 # call life stage 561 Scheduler.call_life_stage_action(stage=LifeStage.case_start, 562 case_name=test_driver[1].source.module_name) 563 564 if getattr(task.config, ConfigConst.history_report_path, ""): 565 module_name = test_driver[1].source.module_name 566 if not self.is_module_need_retry(task, module_name): 567 self._display_executing_process(None, test_driver, 568 test_drivers) 569 LOG.info("%s are passed, no need to retry" % module_name) 570 self._append_history_result(task, module_name) 571 LOG.info("") 572 test_drivers.pop(0) 573 continue 574 575 if getattr(task.config, ConfigConst.component_mapper, ""): 576 module_name = test_driver[1].source.module_name 577 self.component_task_setup(task, module_name) 578 579 # get environment 580 try: 581 environment = self.__allocate_environment__( 582 task.config.__dict__, test_driver) 583 except DeviceError as exception: 584 self._handle_device_error(exception, task, test_drivers) 585 Scheduler.call_life_stage_action(stage=LifeStage.case_end, 586 case_name=test_driver[1].source.module_name, 587 case_result="Failed", 588 error_msg=exception.args) 589 continue 590 591 if not self.queue_monitor_thread.is_alive(): 592 LOG.debug("Restart queue monitor thread.") 593 current_driver_threads = {} 594 message_queue = queue.Queue() 595 self.queue_monitor_thread = self._start_queue_monitor( 596 message_queue, test_drivers, current_driver_threads) 597 continue 598 599 if not Scheduler.is_execute: 600 if environment: 601 Scheduler.__free_environment__(environment) 602 continue 603 604 if check_mode(ModeType.decc) or getattr( 605 task.config, ConfigConst.check_device, False): 606 LOG.info("Start to check environment: %s" % 607 environment.__get_serial__()) 608 status = self._decc_task_setup(environment, task) 609 if not status: 610 Scheduler.__free_environment__(environment) 611 task_unused_env.append(environment) 612 error_message = "Load Error[00116]" 613 self.report_not_executed(task.config.report_path, 614 [test_drivers[0]], 615 error_message, task) 616 test_drivers.pop(0) 617 continue 618 else: 619 LOG.info("Environment %s check success", 620 environment.__get_serial__()) 621 622 # display executing progress 623 self._display_executing_process(environment, test_driver, 624 test_drivers) 625 626 # add to used devices and set need_kit_setup attribute 627 self._append_used_devices(environment, used_devices) 628 629 # start driver thread 630 self._start_driver_thread(current_driver_threads, ( 631 environment, message_queue, task, test_driver)) 632 test_drivers.pop(0) 633 634 # wait for all drivers threads finished and do kit teardown 635 while True: 636 if not self.queue_monitor_thread.is_alive(): 637 break 638 time.sleep(3) 639 640 self._do_taskkit_teardown(used_devices, task_unused_env) 641 642 @classmethod 643 def _append_history_result(cls, task, module_name): 644 history_report_path = getattr( 645 task.config, ConfigConst.history_report_path, "") 646 from _core.report.result_reporter import ResultReporter 647 params = ResultReporter.get_task_info_params( 648 history_report_path) 649 650 if not params or not params[ReportConst.data_reports]: 651 LOG.debug("Task info record data reports is empty") 652 return 653 654 report_data_dict = dict(params[ReportConst.data_reports]) 655 if module_name not in report_data_dict.keys(): 656 module_name_ = str(module_name).split(".")[0] 657 if module_name_ not in report_data_dict.keys(): 658 LOG.error("%s not in data reports" % module_name) 659 return 660 module_name = module_name_ 661 662 from xdevice import SuiteReporter 663 if check_mode(ModeType.decc): 664 virtual_report_path, report_result = SuiteReporter. \ 665 get_history_result_by_module(module_name) 666 LOG.debug("Append history result: (%s, %s)" % ( 667 virtual_report_path, report_result)) 668 SuiteReporter.append_report_result( 669 (virtual_report_path, report_result)) 670 else: 671 history_execute_result = report_data_dict.get(module_name, "") 672 LOG.info("Start copy %s" % history_execute_result) 673 file_name = get_filename_extension(history_execute_result)[0] 674 if os.path.exists(history_execute_result): 675 result_dir = \ 676 os.path.join(task.config.report_path, "result") 677 os.makedirs(result_dir, exist_ok=True) 678 target_execute_result = "%s.xml" % os.path.join( 679 task.config.report_path, "result", file_name) 680 shutil.copyfile(history_execute_result, target_execute_result) 681 LOG.info("Copy %s to %s" % ( 682 history_execute_result, target_execute_result)) 683 else: 684 error_msg = "Copy failed! %s not exists!" % \ 685 history_execute_result 686 raise ParamError(error_msg) 687 688 def _handle_device_error(self, exception, task, test_drivers): 689 self._display_executing_process(None, test_drivers[0], test_drivers) 690 error_message = "%s: %s" % \ 691 (get_instance_name(exception), exception) 692 LOG.exception(error_message, exc_info=False, 693 error_no=exception.error_no) 694 if check_mode(ModeType.decc): 695 error_message = "Load Error[00104]" 696 self.report_not_executed(task.config.report_path, [test_drivers[0]], 697 error_message, task) 698 699 LOG.info("") 700 test_drivers.pop(0) 701 702 @classmethod 703 def _clear_not_executed(cls, task, test_drivers): 704 if Scheduler.mode != ModeType.decc: 705 # clear all 706 test_drivers.clear() 707 return 708 # The result is reported only in DECC mode, and also clear all. 709 LOG.error("Case no run: task execution terminated!", error_no="00300") 710 error_message = "Execute Terminate[00300]" 711 cls.report_not_executed(task.config.report_path, test_drivers, 712 error_message) 713 test_drivers.clear() 714 715 @classmethod 716 def report_not_executed(cls, report_path, test_drivers, error_message, 717 task=None): 718 # traversing list to get remained elements 719 for test_driver in test_drivers: 720 # get report file 721 if task and getattr(task.config, "testdict", ""): 722 report_file = os.path.join(get_sub_path( 723 test_driver[1].source.source_file), 724 "%s.xml" % test_driver[1].source.test_name) 725 else: 726 report_file = os.path.join( 727 report_path, "result", 728 "%s.xml" % test_driver[1].source.module_name) 729 730 # get report name 731 report_name = test_driver[1].source.test_name if \ 732 not test_driver[1].source.test_name.startswith("{") \ 733 else "report" 734 735 # get module name 736 module_name = test_driver[1].source.module_name 737 738 # here, normally create empty report and then upload result 739 check_result_report(report_path, report_file, error_message, 740 report_name, module_name) 741 742 def _start_driver_thread(self, current_driver_threads, thread_params): 743 environment, message_queue, task, test_driver = thread_params 744 thread_id = self._get_thread_id(current_driver_threads) 745 driver_thread = DriversThread(test_driver, task, environment, 746 message_queue) 747 driver_thread.setDaemon(True) 748 driver_thread.set_thread_id(thread_id) 749 driver_thread.set_listeners(self._create_listeners(task)) 750 driver_thread.start() 751 current_driver_threads.setdefault(thread_id, driver_thread) 752 LOG.info(f"Driver executing in thread {driver_thread.ident}") 753 LOG.info(f"Thread id: {thread_id} execute started") 754 755 @classmethod 756 def _do_taskkit_teardown(cls, used_devices, task_unused_env): 757 for device in used_devices.values(): 758 if getattr(device, ConfigConst.need_kit_setup, True): 759 continue 760 761 for kit in getattr(device, ConfigConst.task_kits, []): 762 try: 763 kit.__teardown__(device) 764 except Exception as error: 765 LOG.debug("Do task kit teardown: %s" % error) 766 setattr(device, ConfigConst.task_kits, []) 767 setattr(device, ConfigConst.need_kit_setup, True) 768 769 for environment in task_unused_env: 770 for device in environment.devices: 771 setattr(device, ConfigConst.task_state, True) 772 setattr(device, ConfigConst.need_kit_setup, True) 773 774 def _display_executing_process(self, environment, test_driver, 775 test_drivers): 776 source_content = test_driver[1].source.source_file or \ 777 test_driver[1].source.source_string 778 if environment is None: 779 LOG.info("[%d / %d] Executing: %s, Driver: %s" % 780 (self.test_number - len(test_drivers) + 1, 781 self.test_number, source_content, 782 test_driver[1].source.test_type)) 783 return 784 785 LOG.info("[%d / %d] Executing: %s, Device: %s, Driver: %s" % 786 (self.test_number - len(test_drivers) + 1, 787 self.test_number, source_content, 788 environment.__get_serial__(), 789 test_driver[1].source.test_type)) 790 791 @classmethod 792 def _get_thread_id(cls, current_driver_threads): 793 thread_id = get_cst_time().strftime('%Y-%m-%d-%H-%M-%S-%f') 794 while thread_id in current_driver_threads.keys(): 795 thread_id = get_cst_time().strftime('%Y-%m-%d-%H-%M-%S-%f') 796 return thread_id 797 798 @classmethod 799 def _append_used_devices(cls, environment, used_devices): 800 if environment is not None: 801 for device in environment.devices: 802 device_serial = device.__get_serial__() if device else "None" 803 if device_serial and device_serial not in used_devices.keys(): 804 used_devices[device_serial] = device 805 806 @staticmethod 807 def _start_queue_monitor(message_queue, test_drivers, 808 current_driver_threads): 809 queue_monitor_thread = QueueMonitorThread(message_queue, 810 current_driver_threads, 811 test_drivers) 812 queue_monitor_thread.setDaemon(True) 813 queue_monitor_thread.start() 814 return queue_monitor_thread 815 816 def exec_command(self, command, options): 817 """ 818 Directly executes a command without adding it to the command queue. 819 """ 820 if command != "run": 821 raise ParamError("unsupported command action: %s" % command, 822 error_no="00100") 823 exec_type = options.exectype 824 if exec_type in [TestExecType.device_test, TestExecType.host_test, 825 TestExecType.host_driven_test]: 826 self._exec_task(options) 827 else: 828 LOG.error("Unsupported execution type '%s'" % exec_type, 829 error_no="00100") 830 831 return 832 833 def _exec_task(self, options): 834 """ 835 Directly allocates a device and execute a device test. 836 """ 837 try: 838 self.check_auto_retry(options) 839 task = self.__discover__(options.__dict__) 840 self.__execute__(task) 841 except (ParamError, ValueError, TypeError, SyntaxError, 842 AttributeError) as exception: 843 error_no = getattr(exception, "error_no", "00000") 844 LOG.exception("%s: %s" % (get_instance_name(exception), exception), 845 exc_info=False, error_no=error_no) 846 if Scheduler.upload_address: 847 Scheduler.upload_unavailable_result(str(exception.args)) 848 Scheduler.upload_report_end() 849 finally: 850 self.stop_task_logcat() 851 self.stop_encrypt_log() 852 self.start_auto_retry() 853 854 @classmethod 855 def _reset_environment(cls, environment="", config_file=""): 856 env_manager = EnvironmentManager() 857 env_manager.env_stop() 858 EnvironmentManager(environment, config_file) 859 860 @classmethod 861 def _restore_environment(cls): 862 env_manager = EnvironmentManager() 863 env_manager.env_stop() 864 EnvironmentManager() 865 866 @classmethod 867 def start_task_log(cls, log_path): 868 tool_file_name = "task_log.log" 869 tool_log_file = os.path.join(log_path, tool_file_name) 870 add_task_file_handler(tool_log_file) 871 872 @classmethod 873 def start_encrypt_log(cls, log_path): 874 from _core.report.encrypt import check_pub_key_exist 875 if check_pub_key_exist(): 876 encrypt_file_name = "task_log.ept" 877 encrypt_log_file = os.path.join(log_path, encrypt_file_name) 878 add_encrypt_file_handler(encrypt_log_file) 879 880 @classmethod 881 def stop_task_logcat(cls): 882 remove_task_file_handler() 883 884 @classmethod 885 def stop_encrypt_log(cls): 886 remove_encrypt_file_handler() 887 888 @staticmethod 889 def _find_test_root_descriptor(config): 890 if getattr(config, ConfigConst.task, None) or \ 891 getattr(config, ConfigConst.testargs, None): 892 Scheduler._pre_component_test(config) 893 894 if getattr(config, ConfigConst.subsystems, "") or \ 895 getattr(config, ConfigConst.parts, "") or \ 896 getattr(config, ConfigConst.component_base_kit, ""): 897 uid = unique_id("Scheduler", "component") 898 if config.subsystems or config.parts: 899 test_set = (config.subsystems, config.parts) 900 else: 901 kit = getattr(config, ConfigConst.component_base_kit) 902 test_set = kit.get_white_list() 903 904 root = Descriptor(uuid=uid, name="component", 905 source=TestSetSource(test_set), 906 con=True) 907 908 root.children = find_test_descriptors(config) 909 return root 910 # read test list from testdict 911 if getattr(config, ConfigConst.testdict, "") != "" and getattr( 912 config, ConfigConst.testfile, "") == "": 913 uid = unique_id("Scheduler", "testdict") 914 root = Descriptor(uuid=uid, name="testdict", 915 source=TestSetSource(config.testdict), 916 con=True) 917 root.children = find_testdict_descriptors(config) 918 return root 919 920 # read test list from testfile, testlist or task 921 test_set = getattr(config, ConfigConst.testfile, "") or getattr( 922 config, ConfigConst.testlist, "") or getattr( 923 config, ConfigConst.task, "") or getattr( 924 config, ConfigConst.testcase) 925 if test_set: 926 fname, _ = get_filename_extension(test_set) 927 uid = unique_id("Scheduler", fname) 928 root = Descriptor(uuid=uid, name=fname, 929 source=TestSetSource(test_set), con=True) 930 root.children = find_test_descriptors(config) 931 return root 932 else: 933 raise ParamError("no test file, list, dict, case or task found", 934 error_no="00102") 935 936 @classmethod 937 def terminate_cmd_exec(cls): 938 Scheduler.is_execute = False 939 Scheduler.repeat_index = 0 940 Scheduler.auto_retry = -1 941 LOG.info("Start to terminate execution") 942 return Scheduler.terminate_result.get() 943 944 @classmethod 945 def upload_case_result(cls, upload_param): 946 if not Scheduler.upload_address: 947 return 948 case_id, result, error, start_time, end_time, report_path = \ 949 upload_param 950 if error and len(error) > MAX_VISIBLE_LENGTH: 951 error = "%s..." % error[:MAX_VISIBLE_LENGTH] 952 LOG.info( 953 "Get upload params: %s, %s, %s, %s, %s, %s" % ( 954 case_id, result, error, start_time, end_time, report_path)) 955 if Scheduler.proxy is not None: 956 Scheduler.proxy.upload_result(case_id, result, error, start_time, 957 end_time, report_path) 958 else: 959 LOG.debug("There is no proxy, can't upload case result") 960 961 @classmethod 962 def upload_module_result(cls, exec_message): 963 if not Scheduler.is_execute: 964 return 965 result_file = exec_message.get_result() 966 request = exec_message.get_request() 967 test_name = request.root.source.test_name 968 if not result_file or not os.path.exists(result_file): 969 LOG.error("%s result not exists", test_name, error_no="00200") 970 return 971 972 test_type = request.root.source.test_type 973 LOG.info("Need upload result: %s, test type: %s" % 974 (result_file, test_type)) 975 upload_params, _, _ = cls._get_upload_params(result_file, request) 976 if not upload_params: 977 LOG.error("%s no test case result to upload" % result_file, 978 error_no="00201") 979 return 980 LOG.info("Need upload %s case" % len(upload_params)) 981 upload_suite = [] 982 for upload_param in upload_params: 983 case_id, result, error, start_time, end_time, report_path = \ 984 upload_param 985 case = {"caseid": case_id, "result": result, "error": error, 986 "start": start_time, "end": end_time, 987 "report": report_path} 988 LOG.info("Case info: %s", case) 989 upload_suite.append(case) 990 if Scheduler.proxy is not None: 991 Scheduler.proxy.upload_batch(upload_suite) 992 else: 993 LOG.debug("There is no proxy, can't upload module result") 994 995 @classmethod 996 def _get_upload_params(cls, result_file, request): 997 upload_params = [] 998 report_path = result_file 999 testsuites_element = DataHelper.parse_data_report(report_path) 1000 start_time, end_time = cls._get_time(testsuites_element) 1001 test_type = request.get_test_type() 1002 if test_type == HostDrivenTestType.device_test or test_type == HostDrivenTestType.windows_test: 1003 for model_element in testsuites_element: 1004 case_id = model_element.get(ReportConstant.name, "") 1005 case_result, error = cls.get_script_result(model_element) 1006 if error and len(error) > MAX_VISIBLE_LENGTH: 1007 error = "{}...".format(error[:MAX_VISIBLE_LENGTH]) 1008 report = cls._get_report_path( 1009 request.config.report_path, 1010 model_element.get(ReportConstant.report, "")) 1011 upload_params.append( 1012 (case_id, case_result, error, start_time, end_time, report,)) 1013 else: 1014 for testsuite_element in testsuites_element: 1015 if check_mode(ModeType.developer): 1016 module_name = str(get_filename_extension( 1017 report_path)[0]).split(".")[0] 1018 else: 1019 module_name = testsuite_element.get(ReportConstant.name, 1020 "none") 1021 for case_element in testsuite_element: 1022 case_id = cls._get_case_id(case_element, module_name) 1023 case_result, error = cls._get_case_result(case_element) 1024 if case_result == "Ignored": 1025 LOG.info( 1026 "Get upload params: {} result is ignored".format(case_id)) 1027 continue 1028 if error and len(error) > MAX_VISIBLE_LENGTH: 1029 error = "{}...".format(error[:MAX_VISIBLE_LENGTH]) 1030 report = cls._get_report_path( 1031 request.config.report_path, 1032 case_element.get(ReportConstant.report, "")) 1033 upload_params.append( 1034 (case_id, case_result, error, start_time, end_time, report,)) 1035 return upload_params, start_time, end_time 1036 1037 @classmethod 1038 def get_script_result(cls, model_element): 1039 disabled = int(model_element.get(ReportConstant.disabled)) if \ 1040 model_element.get(ReportConstant.disabled, "") else 0 1041 failures = int(model_element.get(ReportConstant.failures)) if \ 1042 model_element.get(ReportConstant.failures, "") else 0 1043 errors = int(model_element.get(ReportConstant.errors)) if \ 1044 model_element.get(ReportConstant.errors, "") else 0 1045 unavailable = int(model_element.get(ReportConstant.unavailable)) if \ 1046 model_element.get(ReportConstant.unavailable, "") else 0 1047 if failures > 0 or errors > 0: 1048 result = "Failed" 1049 elif disabled > 0 or unavailable > 0: 1050 result = "Unavailable" 1051 else: 1052 result = "Passed" 1053 1054 if result == "Passed": 1055 return result, "" 1056 if Scheduler.mode == ModeType.decc: 1057 result = "Failed" 1058 result_kind = model_element.get(ReportConstant.result_kind, "") 1059 if result_kind: 1060 result = result_kind 1061 1062 error_msg = model_element.get(ReportConstant.message, "") 1063 if not error_msg and len(model_element) > 0: 1064 error_msg = model_element[0].get(ReportConstant.message, "") 1065 if not error_msg and len(model_element[0]) > 0: 1066 error_msg = model_element[0][0].get(ReportConstant.message, "") 1067 return result, error_msg 1068 1069 @classmethod 1070 def _get_case_id(cls, case_element, package_name): 1071 class_name = case_element.get(ReportConstant.class_name, "none") 1072 method_name = case_element.get(ReportConstant.name, "none") 1073 case_id = "{}#{}#{}#{}".format(Scheduler.task_name, package_name, 1074 class_name, method_name) 1075 return case_id 1076 1077 @classmethod 1078 def _get_case_result(cls, case_element): 1079 # get result 1080 case = Case() 1081 case.status = case_element.get(ReportConstant.status, "") 1082 case.result = case_element.get(ReportConstant.result, "") 1083 if case_element.get(ReportConstant.message, ""): 1084 case.message = case_element.get(ReportConstant.message) 1085 if len(case_element) > 0: 1086 if not case.result: 1087 case.result = ReportConstant.false 1088 case.message = case_element[0].get(ReportConstant.message) 1089 if case.is_passed(): 1090 result = "Passed" 1091 elif case.is_failed(): 1092 result = "Failed" 1093 elif case.is_blocked(): 1094 result = "Blocked" 1095 elif case.is_ignored(): 1096 result = "Ignored" 1097 elif case.is_completed(): 1098 if case.message: 1099 result = "Failed" 1100 else: 1101 result = "Passed" 1102 else: 1103 result = "Unavailable" 1104 return result, case.message 1105 1106 @classmethod 1107 def _get_time(cls, testsuite_element): 1108 start_time = testsuite_element.get(ReportConstant.start_time, "") 1109 end_time = testsuite_element.get(ReportConstant.end_time, "") 1110 try: 1111 if start_time and end_time: 1112 start_time = int(time.mktime(time.strptime( 1113 start_time, ReportConstant.time_format)) * 1000) 1114 end_time = int(time.mktime(time.strptime( 1115 end_time, ReportConstant.time_format)) * 1000) 1116 else: 1117 timestamp = str(testsuite_element.get( 1118 ReportConstant.time_stamp, "")).replace("T", " ") 1119 cost_time = testsuite_element.get(ReportConstant.time, "") 1120 if timestamp and cost_time: 1121 try: 1122 end_time = int(time.mktime(time.strptime( 1123 timestamp, ReportConstant.time_format)) * 1000) 1124 except ArithmeticError as error: 1125 LOG.error("Get time error {}".format(error)) 1126 end_time = int(time.time() * 1000) 1127 except ValueError as error: 1128 LOG.error("Get time error {}".format(error)) 1129 end_time = int(time.mktime(time.strptime( 1130 timestamp.split(".")[0], ReportConstant.time_format)) * 1000) 1131 start_time = int(end_time - float(cost_time) * 1000) 1132 else: 1133 current_time = int(time.time() * 1000) 1134 start_time, end_time = current_time, current_time 1135 except ArithmeticError as error: 1136 LOG.error("Get time error {}".format(error)) 1137 current_time = int(time.time() * 1000) 1138 start_time, end_time = current_time, current_time 1139 return start_time, end_time 1140 1141 @classmethod 1142 def _get_report_path(cls, base_path, report=""): 1143 """ get report path 1144 base_path: str, report base path 1145 report : str, report relative path 1146 """ 1147 report_path = os.path.join(base_path, report) 1148 return report_path if report and os.path.exists(report_path) else base_path 1149 1150 @classmethod 1151 def upload_task_result(cls, task, error_message=""): 1152 if not Scheduler.task_name: 1153 LOG.info("No need upload summary report") 1154 return 1155 1156 summary_data_report = os.path.join(task.config.report_path, 1157 ReportConstant.summary_data_report) 1158 if not os.path.exists(summary_data_report): 1159 Scheduler.upload_unavailable_result(str( 1160 error_message) or "summary report not exists", 1161 task.config.report_path) 1162 return 1163 1164 task_element = ElementTree.parse(summary_data_report).getroot() 1165 start_time, end_time = cls._get_time(task_element) 1166 task_result = cls._get_task_result(task_element) 1167 error_msg = "" 1168 for child in task_element: 1169 if child.get(ReportConstant.message, ""): 1170 error_msg = "{}{}".format( 1171 error_msg, "%s;" % child.get(ReportConstant.message)) 1172 if error_msg: 1173 error_msg = error_msg[:-1] 1174 report = cls._get_report_path( 1175 task.config.report_path, ReportConstant.summary_vision_report) 1176 cls.upload_case_result( 1177 (Scheduler.task_name, task_result, error_msg, start_time, end_time, report)) 1178 1179 @classmethod 1180 def _get_task_result(cls, task_element): 1181 failures = int(task_element.get(ReportConstant.failures, 0)) 1182 errors = int(task_element.get(ReportConstant.errors, 0)) 1183 disabled = int(task_element.get(ReportConstant.disabled, 0)) 1184 unavailable = int(task_element.get(ReportConstant.unavailable, 0)) 1185 if disabled > 0: 1186 task_result = "Blocked" 1187 elif errors > 0 or failures > 0: 1188 task_result = "Failed" 1189 elif unavailable > 0: 1190 task_result = "Unavailable" 1191 else: 1192 task_result = "Passed" 1193 return task_result 1194 1195 @classmethod 1196 def upload_unavailable_result(cls, error_msg, report_path=""): 1197 start_time = int(time.time() * 1000) 1198 Scheduler.upload_case_result((Scheduler.task_name, "Unavailable", 1199 error_msg, start_time, start_time, 1200 report_path)) 1201 1202 @classmethod 1203 def upload_report_end(cls): 1204 if getattr(cls, "tmp_json", None): 1205 os.remove(cls.tmp_json) 1206 del cls.tmp_json 1207 LOG.info("Upload report end") 1208 if Scheduler.proxy is not None: 1209 Scheduler.proxy.report_end() 1210 else: 1211 LOG.debug("There is no proxy, can't upload report end") 1212 1213 @classmethod 1214 def is_module_need_retry(cls, task, module_name): 1215 failed_flag = False 1216 if check_mode(ModeType.decc): 1217 from xdevice import SuiteReporter 1218 for module, _ in SuiteReporter.get_failed_case_list(): 1219 if module_name == module or str(module_name).split( 1220 ".")[0] == module: 1221 failed_flag = True 1222 break 1223 else: 1224 from xdevice import ResultReporter 1225 history_report_path = \ 1226 getattr(task.config, ConfigConst.history_report_path, "") 1227 params = ResultReporter.get_task_info_params(history_report_path) 1228 if params and params[ReportConst.unsuccessful_params]: 1229 if dict(params[ReportConst.unsuccessful_params]).get( 1230 module_name, []): 1231 failed_flag = True 1232 elif dict(params[ReportConst.unsuccessful_params]).get( 1233 str(module_name).split(".")[0], []): 1234 failed_flag = True 1235 return failed_flag 1236 1237 @classmethod 1238 def compare_spt_time(cls, kit_spt, device_spt): 1239 if not kit_spt or not device_spt: 1240 return False 1241 try: 1242 kit_time = str(kit_spt).split("-")[:2] 1243 device_time = str(device_spt).split("-")[:2] 1244 k_spt = datetime.datetime.strptime( 1245 "-".join(kit_time), "%Y-%m") 1246 d_spt = datetime.datetime.strptime("-".join(device_time), "%Y-%m") 1247 except ValueError as value_error: 1248 LOG.debug("Date format is error, %s" % value_error.args) 1249 return False 1250 month_interval = int(k_spt.month) - int(d_spt.month) 1251 year_interval = int(k_spt.year) - int(d_spt.year) 1252 LOG.debug("Kit spt (year=%s, month=%s), device spt (year=%s, month=%s)" 1253 % (k_spt.year, k_spt.month, d_spt.year, d_spt.month)) 1254 if year_interval < 0: 1255 return True 1256 if year_interval == 0 and month_interval in range(-11, 3): 1257 return True 1258 if year_interval == 1 and month_interval + 12 in (1, 2): 1259 return True 1260 return False 1261 1262 @classmethod 1263 def _parse_property_value(cls, property_name, driver_request, kit): 1264 test_args = copy.deepcopy( 1265 driver_request.config.get(ConfigConst.testargs, dict())) 1266 property_value = "" 1267 if ConfigConst.pass_through in test_args.keys(): 1268 pt_dict = json.loads(test_args.get(ConfigConst.pass_through, "")) 1269 property_value = pt_dict.get(property_name, None) 1270 elif property_name in test_args.keys: 1271 property_value = test_args.get(property_name, None) 1272 return property_value if property_value else \ 1273 kit.properties.get(property_name, None) 1274 1275 @classmethod 1276 def _calculate_device_options(cls, device_options, environment_config, 1277 options, test_source): 1278 # calculate difference 1279 diff_value = len(environment_config) - len(device_options) 1280 if device_options and diff_value == 0: 1281 return device_options 1282 1283 else: 1284 diff_value = diff_value if diff_value else 1 1285 if str(test_source.source_file).endswith(".bin"): 1286 device_option = DeviceSelectionOption( 1287 options, DeviceLabelType.ipcamera, test_source) 1288 else: 1289 device_option = DeviceSelectionOption( 1290 options, None, test_source) 1291 1292 device_option.source_file = \ 1293 test_source.source_file or test_source.source_string 1294 device_option.required_manager = "device" 1295 device_options.extend([device_option] * diff_value) 1296 LOG.debug("Assign device options and it's length is %s" 1297 % len(device_options)) 1298 return device_options 1299 1300 @classmethod 1301 def update_test_type_in_source(cls, key, value): 1302 LOG.debug("update test type dict in source") 1303 TestDictSource.test_type[key] = value 1304 1305 @classmethod 1306 def update_ext_type_in_source(cls, key, value): 1307 LOG.debug("update ext type dict in source") 1308 TestDictSource.exe_type[key] = value 1309 1310 @classmethod 1311 def clear_test_dict_source(cls): 1312 TestDictSource.clear() 1313 1314 @classmethod 1315 def reset_test_dict_source(cls): 1316 TestDictSource.reset() 1317 1318 @classmethod 1319 def _pre_component_test(cls, config): 1320 if not config.kits: 1321 return 1322 cur_kit = None 1323 for kit in config.kits: 1324 if kit.__class__.__name__ == CKit.component: 1325 cur_kit = kit 1326 break 1327 if not cur_kit: 1328 return 1329 get_white_list = getattr(cur_kit, "get_white_list", None) 1330 if not callable(get_white_list): 1331 return 1332 subsystems, parts = get_white_list() 1333 if not subsystems and not parts: 1334 return 1335 setattr(config, ConfigConst.component_base_kit, cur_kit) 1336 1337 @classmethod 1338 def component_task_setup(cls, task, module_name): 1339 component_kit = task.config.get(ConfigConst.component_base_kit, None) 1340 if not component_kit: 1341 # only -p -s .you do not care about the components that can be 1342 # supported. you only want to run the use cases of the current 1343 # component 1344 return 1345 LOG.debug("Start component task setup") 1346 _component_mapper = task.config.get(ConfigConst.component_mapper) 1347 _subsystem, _part = _component_mapper.get(module_name) 1348 1349 is_hit = False 1350 # find in cache. if not find, update cache 1351 cache_subsystem, cache_part = component_kit.get_cache() 1352 if _subsystem in cache_subsystem or _part in cache_subsystem: 1353 is_hit = True 1354 if not is_hit: 1355 env_manager = EnvironmentManager() 1356 for _, manager in env_manager.managers.items(): 1357 if getattr(manager, "devices_list", []): 1358 for device in manager.devices_list: 1359 component_kit.__setup__(device) 1360 cache_subsystem, cache_part = component_kit.get_cache() 1361 if _subsystem in cache_subsystem or _part in cache_subsystem: 1362 is_hit = True 1363 if not is_hit: 1364 LOG.warning("%s are skipped, no suitable component found. " 1365 "Require subsystem=%s part=%s, no device match this" 1366 % (module_name, _subsystem, _part)) 1367 1368 @classmethod 1369 def construct_repeat_list(cls, task, index): 1370 repeat_list = list() 1371 for driver_index in range(len(task.test_drivers)): 1372 cur_test_driver = copy.deepcopy(task.test_drivers[driver_index]) 1373 desc = cur_test_driver[1] 1374 desc.unique_id = '{}_{}'.format(desc.unique_id, index + 1) 1375 repeat_list.append(cur_test_driver) 1376 return repeat_list 1377 1378 @classmethod 1379 def start_auto_retry(cls): 1380 if not Scheduler.is_need_auto_retry: 1381 Scheduler.auto_retry = -1 1382 LOG.debug("No need auto retry") 1383 return 1384 if Scheduler.auto_retry > 0: 1385 Scheduler.auto_retry -= 1 1386 if Scheduler.auto_retry == 0: 1387 Scheduler.auto_retry = -1 1388 from _core.command.console import Console 1389 console = Console() 1390 console.command_parser("run --retry") 1391 1392 @classmethod 1393 def check_auto_retry(cls, options): 1394 if Scheduler.auto_retry < 0 and \ 1395 int(getattr(options, ConfigConst.auto_retry, 0)) > 0: 1396 value = int(getattr(options, ConfigConst.auto_retry, 0)) 1397 Scheduler.auto_retry = value if value <= 10 else 10 1398 1399 @staticmethod 1400 def call_life_stage_action(**kwargs): 1401 """ 1402 call in different lift stage 1403 """ 1404 from xdevice import Task 1405 from xdevice import Variables 1406 if Task.life_stage_listener is None: 1407 return 1408 stage = kwargs.get("stage", None) 1409 data = dict() 1410 if stage == LifeStage.task_start: 1411 data = {"type": stage, "name": Variables.task_name} 1412 elif stage == LifeStage.task_end: 1413 task = kwargs.get("task", None) 1414 error = kwargs.get("error", "") 1415 unavailable = kwargs.get("unavailable", 0) 1416 summary_data_report = os.path.join(task.config.report_path, 1417 ReportConstant.summary_data_report) if task else "" 1418 if not os.path.exists(summary_data_report): 1419 LOG.error("Call lifecycle error, summary report {} not exists".format(task.config.report_path)) 1420 passed = failures = blocked = 0 1421 else: 1422 task_element = ElementTree.parse(summary_data_report).getroot() 1423 total_tests = int(task_element.get(ReportConstant.tests, 0)) 1424 failures = int(task_element.get(ReportConstant.failures, 0)) 1425 blocked = int(task_element.get(ReportConstant.disabled, 0)) 1426 ignored = int(task_element.get(ReportConstant.ignored, 0)) 1427 unavailable = int(task_element.get(ReportConstant.unavailable, 0)) 1428 passed = total_tests - failures - blocked - ignored 1429 data = {"type": stage, "name": Variables.task_name, 1430 "passed": passed, "failures": failures, 1431 "blocked": blocked, "unavailable": unavailable, 1432 "error": error} 1433 elif stage == LifeStage.case_start: 1434 case_name = kwargs.get("case_name", "") 1435 data = {"type": stage, "name": case_name} 1436 elif stage == LifeStage.case_end: 1437 case_name = kwargs.get("case_name", "") 1438 case_result = kwargs.get("case_result", "") 1439 error_msg = kwargs.get("error_msg", "") 1440 data = {"type": stage, "name": case_name, "case_result": case_result, "error_msg": error_msg} 1441 else: 1442 LOG.error("Call lifecycle error, error param stage: {}".format(stage)) 1443 return 1444 Task.life_stage_listener(data) 1445