1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import copy 20import datetime 21import os 22import queue 23import time 24import uuid 25import shutil 26from xml.etree import ElementTree 27 28from _core.utils import unique_id 29from _core.utils import check_mode 30from _core.utils import get_sub_path 31from _core.utils import get_filename_extension 32from _core.utils import convert_serial 33from _core.utils import get_instance_name 34from _core.utils import is_config_str 35from _core.utils import check_result_report 36from _core.utils import get_cst_time 37from _core.environment.manager_env import EnvironmentManager 38from _core.environment.manager_env import DeviceSelectionOption 39from _core.exception import ParamError 40from _core.exception import ExecuteTerminate 41from _core.exception import LiteDeviceError 42from _core.exception import DeviceError 43from _core.interface import LifeCycle 44from _core.executor.request import Request 45from _core.executor.request import Task 46from _core.executor.request import Descriptor 47from _core.plugin import get_plugin 48from _core.plugin import Plugin 49from _core.plugin import Config 50from _core.report.reporter_helper import ExecInfo 51from _core.report.reporter_helper import ReportConstant 52from _core.report.reporter_helper import Case 53from _core.report.reporter_helper import DataHelper 54from _core.constants import TestExecType 55from _core.constants import CKit 56from _core.constants import ModeType 57from _core.constants import DeviceLabelType 58from _core.constants import SchedulerType 59from _core.constants import ListenerType 60from _core.constants import ConfigConst 61from _core.constants import ReportConst 62from _core.constants import HostDrivenTestType 63from _core.executor.concurrent import DriversThread 64from _core.executor.concurrent import QueueMonitorThread 65from _core.executor.concurrent import DriversDryRunThread 66from _core.executor.source import TestSetSource 67from _core.executor.source import find_test_descriptors 68from _core.executor.source import find_testdict_descriptors 69from _core.executor.source import TestDictSource 70from _core.logger import platform_logger 71from _core.logger import add_task_file_handler 72from _core.logger import remove_task_file_handler 73from _core.logger import add_encrypt_file_handler 74from _core.logger import remove_encrypt_file_handler 75 76__all__ = ["Scheduler"] 77LOG = platform_logger("Scheduler") 78 79MAX_VISIBLE_LENGTH = 150 80 81 82@Plugin(type=Plugin.SCHEDULER, id=SchedulerType.scheduler) 83class Scheduler(object): 84 """ 85 The Scheduler is the main entry point for client code that wishes to 86 discover and execute tests. 87 """ 88 # factory params 89 is_execute = True 90 terminate_result = queue.Queue() 91 upload_address = "" 92 task_type = "" 93 task_name = "" 94 mode = "" 95 proxy = None 96 97 # command_queue to store test commands 98 command_queue = [] 99 max_command_num = 50 100 # the number of tests in current task 101 test_number = 0 102 device_labels = [] 103 auto_retry = -1 104 is_need_auto_retry = False 105 106 def __discover__(self, args): 107 """Discover task to execute""" 108 config = Config() 109 config.update(args) 110 task = Task(drivers=[]) 111 task.init(config) 112 113 root_descriptor = self._find_test_root_descriptor(task.config) 114 task.set_root_descriptor(root_descriptor) 115 return task 116 117 def __execute__(self, task): 118 error_message = "" 119 try: 120 Scheduler.is_execute = True 121 if Scheduler.command_queue: 122 LOG.debug("Run command: %s" % Scheduler.command_queue[-1]) 123 run_command = Scheduler.command_queue.pop() 124 task_id = str(uuid.uuid1()).split("-")[0] 125 Scheduler.command_queue.append((task_id, run_command, 126 task.config.report_path)) 127 if len(Scheduler.command_queue) > self.max_command_num: 128 Scheduler.command_queue.pop(0) 129 130 if getattr(task.config, ConfigConst.test_environment, ""): 131 self._reset_environment(task.config.get( 132 ConfigConst.test_environment, "")) 133 elif getattr(task.config, ConfigConst.configfile, ""): 134 self._reset_environment(config_file=task.config.get( 135 ConfigConst.configfile, "")) 136 137 # do with the count of repeat about a task 138 if getattr(task.config, ConfigConst.repeat, 0) > 0: 139 drivers_list = list() 140 for repeat_index in range(task.config.repeat): 141 for driver_index in range(len(task.test_drivers)): 142 drivers_list.append( 143 copy.deepcopy(task.test_drivers[driver_index])) 144 task.test_drivers = drivers_list 145 146 self.test_number = len(task.test_drivers) 147 148 if task.config.exectype == TestExecType.device_test: 149 self._device_test_execute(task) 150 elif task.config.exectype == TestExecType.host_test: 151 self._host_test_execute(task) 152 else: 153 LOG.info("Exec type %s is bypassed" % task.config.exectype) 154 155 except (ParamError, ValueError, TypeError, SyntaxError, AttributeError, 156 DeviceError, LiteDeviceError, ExecuteTerminate) as exception: 157 error_no = getattr(exception, "error_no", "") 158 error_message = "%s[%s]" % (str(exception), error_no) \ 159 if error_no else str(exception) 160 error_no = error_no if error_no else "00000" 161 LOG.exception(exception, exc_info=False, error_no=error_no) 162 163 finally: 164 Scheduler.reset_test_dict_source() 165 if getattr(task.config, ConfigConst.test_environment, "") or \ 166 getattr(task.config, ConfigConst.configfile, ""): 167 self._restore_environment() 168 169 if Scheduler.upload_address: 170 Scheduler.upload_task_result(task, error_message) 171 Scheduler.upload_report_end() 172 173 def _device_test_execute(self, task): 174 used_devices = {} 175 try: 176 self._dynamic_concurrent_execute(task, used_devices) 177 finally: 178 Scheduler.__reset_environment__(used_devices) 179 # generate reports 180 self._generate_task_report(task, used_devices) 181 182 def _host_test_execute(self, task): 183 """Execute host test""" 184 try: 185 # initial params 186 current_driver_threads = {} 187 test_drivers = task.test_drivers 188 message_queue = queue.Queue() 189 190 # execute test drivers 191 queue_monitor_thread = self._start_queue_monitor( 192 message_queue, test_drivers, current_driver_threads) 193 while test_drivers: 194 if len(current_driver_threads) > 5: 195 time.sleep(3) 196 continue 197 198 # clear remaining test drivers when scheduler is terminated 199 if not Scheduler.is_execute: 200 LOG.info("Clear test drivers") 201 self._clear_not_executed(task, test_drivers) 202 break 203 204 # get test driver and device 205 test_driver = test_drivers[0] 206 207 # display executing progress 208 self._display_executing_process(None, test_driver, 209 test_drivers) 210 211 # start driver thread 212 self._start_driver_thread(current_driver_threads, ( 213 None, message_queue, task, test_driver)) 214 test_drivers.pop(0) 215 216 # wait for all drivers threads finished and do kit teardown 217 while True: 218 if not queue_monitor_thread.is_alive(): 219 break 220 time.sleep(3) 221 222 finally: 223 # generate reports 224 self._generate_task_report(task) 225 226 def _dry_run_device_test_execute(self, task): 227 try: 228 # initial params 229 used_devices = {} 230 current_driver_threads = {} 231 test_drivers = task.test_drivers 232 message_queue = queue.Queue() 233 task_unused_env = [] 234 235 # execute test drivers 236 queue_monitor_thread = self._start_queue_monitor( 237 message_queue, test_drivers, current_driver_threads) 238 while test_drivers: 239 # clear remaining test drivers when scheduler is terminated 240 if not Scheduler.is_execute: 241 LOG.info("Clear test drivers") 242 self._clear_not_executed(task, test_drivers) 243 break 244 245 # get test driver and device 246 test_driver = test_drivers[0] 247 # get environment 248 try: 249 environment = self.__allocate_environment__( 250 task.config.__dict__, test_driver) 251 except DeviceError as exception: 252 self._handle_device_error(exception, task, test_drivers) 253 continue 254 255 if not Scheduler.is_execute: 256 if environment: 257 Scheduler.__free_environment__(environment) 258 continue 259 260 # start driver thread 261 thread_id = self._get_thread_id(current_driver_threads) 262 driver_thread = DriversDryRunThread(test_driver, task, environment, 263 message_queue) 264 driver_thread.setDaemon(True) 265 driver_thread.set_thread_id(thread_id) 266 driver_thread.start() 267 current_driver_threads.setdefault(thread_id, driver_thread) 268 269 test_drivers.pop(0) 270 271 # wait for all drivers threads finished and do kit teardown 272 while True: 273 if not queue_monitor_thread.is_alive(): 274 break 275 time.sleep(3) 276 277 self._do_taskkit_teardown(used_devices, task_unused_env) 278 finally: 279 LOG.debug("Removing report_path: {}".format(task.config.report_path)) 280 # delete reports 281 self.stop_task_logcat() 282 self.stop_encrypt_log() 283 shutil.rmtree(task.config.report_path) 284 285 def _generate_task_report(self, task, used_devices=None): 286 task_info = ExecInfo() 287 test_type = getattr(task.config, "testtype", []) 288 task_name = getattr(task.config, "task", "") 289 if task_name: 290 task_info.test_type = str(task_name).upper() 291 else: 292 task_info.test_type = ",".join(test_type) if test_type else "Test" 293 if used_devices: 294 serials = [] 295 platforms = [] 296 for serial, device in used_devices.items(): 297 serials.append(convert_serial(serial)) 298 platform = str(device.label).capitalize() 299 if platform not in platforms: 300 platforms.append(platform) 301 task_info.device_name = ",".join(serials) 302 task_info.platform = ",".join(platforms) 303 else: 304 task_info.device_name = "None" 305 task_info.platform = "None" 306 task_info.test_time = task.config.start_time 307 task_info.product_info = getattr(task, "product_info", "") 308 309 listeners = self._create_listeners(task) 310 for listener in listeners: 311 listener.__ended__(LifeCycle.TestTask, task_info, 312 test_type=task_info.test_type) 313 314 @classmethod 315 def _create_listeners(cls, task): 316 listeners = [] 317 # append log listeners 318 log_listeners = get_plugin(Plugin.LISTENER, ListenerType.log) 319 for log_listener in log_listeners: 320 log_listener_instance = log_listener.__class__() 321 listeners.append(log_listener_instance) 322 # append report listeners 323 report_listeners = get_plugin(Plugin.LISTENER, ListenerType.report) 324 for report_listener in report_listeners: 325 report_listener_instance = report_listener.__class__() 326 setattr(report_listener_instance, "report_path", 327 task.config.report_path) 328 listeners.append(report_listener_instance) 329 # append upload listeners 330 upload_listeners = get_plugin(Plugin.LISTENER, ListenerType.upload) 331 for upload_listener in upload_listeners: 332 upload_listener_instance = upload_listener.__class__() 333 listeners.append(upload_listener_instance) 334 return listeners 335 336 @staticmethod 337 def _find_device_options(environment_config, options, test_source): 338 devices_option = [] 339 index = 1 340 for device_dict in environment_config: 341 label = device_dict.get("label", "") 342 required_manager = device_dict.get("type", "device") 343 required_manager = \ 344 required_manager if required_manager else "device" 345 if not label: 346 continue 347 device_option = DeviceSelectionOption(options, label, test_source) 348 device_dict.pop("type", None) 349 device_dict.pop("label", None) 350 device_option.required_manager = required_manager 351 device_option.extend_value = device_dict 352 device_option.source_file = \ 353 test_source.config_file or test_source.source_string 354 if hasattr(device_option, "env_index"): 355 device_option.env_index = index 356 index += 1 357 devices_option.append(device_option) 358 return devices_option 359 360 def __allocate_environment__(self, options, test_driver): 361 device_options = self.get_device_options(options, 362 test_driver[1].source) 363 environment = None 364 env_manager = EnvironmentManager() 365 while True: 366 if not Scheduler.is_execute: 367 break 368 environment = env_manager.apply_environment(device_options) 369 if len(environment.devices) == len(device_options): 370 return environment 371 else: 372 env_manager.release_environment(environment) 373 LOG.debug("'%s' is waiting available device", 374 test_driver[1].source.test_name) 375 if env_manager.check_device_exist(device_options): 376 continue 377 else: 378 LOG.debug("'%s' required %s devices, actually %s devices" 379 " were found" % (test_driver[1].source.test_name, 380 len(device_options), 381 len(environment.devices))) 382 raise DeviceError("The '%s' required device does not exist" 383 % test_driver[1].source.source_file, 384 error_no="00104") 385 386 return environment 387 388 @classmethod 389 def get_device_options(cls, options, test_source): 390 device_options = [] 391 config_file = test_source.config_file 392 environment_config = [] 393 from _core.testkit.json_parser import JsonParser 394 if test_source.source_string and is_config_str( 395 test_source.source_string): 396 json_config = JsonParser(test_source.source_string) 397 environment_config = json_config.get_environment() 398 device_options = cls._find_device_options( 399 environment_config, options, test_source) 400 elif config_file and os.path.exists(config_file): 401 json_config = JsonParser(test_source.config_file) 402 environment_config = json_config.get_environment() 403 device_options = cls._find_device_options( 404 environment_config, options, test_source) 405 406 device_options = cls._calculate_device_options( 407 device_options, environment_config, options, test_source) 408 409 if ConfigConst.component_mapper in options.keys(): 410 required_component = options.get(ConfigConst.component_mapper). \ 411 get(test_source.module_name, None) 412 for device_option in device_options: 413 device_option.required_component = required_component 414 return device_options 415 416 @staticmethod 417 def __free_environment__(environment): 418 env_manager = EnvironmentManager() 419 env_manager.release_environment(environment) 420 421 @staticmethod 422 def __reset_environment__(used_devices): 423 env_manager = EnvironmentManager() 424 env_manager.reset_environment(used_devices) 425 426 @classmethod 427 def _check_device_spt(cls, kit, driver_request, device): 428 kit_spt = cls._parse_property_value(ConfigConst.spt, 429 driver_request, kit) 430 if not kit_spt: 431 setattr(device, ConfigConst.task_state, False) 432 LOG.error("Spt is empty", error_no="00108") 433 return 434 if getattr(driver_request, ConfigConst.product_info, ""): 435 product_info = getattr(driver_request, 436 ConfigConst.product_info) 437 if not isinstance(product_info, dict): 438 LOG.warning("Product info should be dict, %s", 439 product_info) 440 setattr(device, ConfigConst.task_state, False) 441 return 442 device_spt = product_info.get("Security Patch", None) 443 if not device_spt or not \ 444 Scheduler.compare_spt_time(kit_spt, device_spt): 445 LOG.error("The device %s spt is %s, " 446 "and the test case spt is %s, " 447 "which does not meet the requirements" % 448 (device.device_sn, device_spt, kit_spt), 449 error_no="00116") 450 setattr(device, ConfigConst.task_state, False) 451 return 452 453 def _decc_task_setup(self, environment, task): 454 config = Config() 455 config.update(task.config.__dict__) 456 config.environment = environment 457 driver_request = Request(config=config) 458 459 if environment is None: 460 return False 461 462 for device in environment.devices: 463 if not getattr(device, ConfigConst.need_kit_setup, True): 464 LOG.debug("Device %s need kit setup is false" % device) 465 continue 466 467 # do task setup for device 468 kits_copy = copy.deepcopy(task.config.kits) 469 setattr(device, ConfigConst.task_kits, kits_copy) 470 for kit in getattr(device, ConfigConst.task_kits, []): 471 if not Scheduler.is_execute: 472 break 473 try: 474 kit.__setup__(device, request=driver_request) 475 except (ParamError, ExecuteTerminate, DeviceError, 476 LiteDeviceError, ValueError, TypeError, 477 SyntaxError, AttributeError) as exception: 478 error_no = getattr(exception, "error_no", "00000") 479 LOG.exception( 480 "Task setup device: %s, exception: %s" % ( 481 environment.__get_serial__(), 482 exception), exc_info=False, error_no=error_no) 483 if kit.__class__.__name__ == CKit.query and \ 484 device.label in [DeviceLabelType.ipcamera]: 485 self._check_device_spt(kit, driver_request, device) 486 LOG.debug("Set device %s need kit setup to false" % device) 487 setattr(device, ConfigConst.need_kit_setup, False) 488 489 for device in environment.devices: 490 if not getattr(device, ConfigConst.task_state, True): 491 return False 492 493 # set product_info to self.task 494 if getattr(driver_request, ConfigConst.product_info, "") and \ 495 not getattr(task, ConfigConst.product_info, ""): 496 product_info = getattr(driver_request, ConfigConst.product_info) 497 if not isinstance(product_info, dict): 498 LOG.warning("Product info should be dict, %s", 499 product_info) 500 else: 501 setattr(task, ConfigConst.product_info, product_info) 502 return True 503 504 def _dynamic_concurrent_execute(self, task, used_devices): 505 # initial params 506 current_driver_threads = {} 507 test_drivers = task.test_drivers 508 message_queue = queue.Queue() 509 task_unused_env = [] 510 511 # execute test drivers 512 queue_monitor_thread = self._start_queue_monitor( 513 message_queue, test_drivers, current_driver_threads) 514 while test_drivers: 515 # clear remaining test drivers when scheduler is terminated 516 if not Scheduler.is_execute: 517 LOG.info("Clear test drivers") 518 self._clear_not_executed(task, test_drivers) 519 break 520 521 # get test driver and device 522 test_driver = test_drivers[0] 523 524 if getattr(task.config, ConfigConst.history_report_path, ""): 525 module_name = test_driver[1].source.module_name 526 if not self.is_module_need_retry(task, module_name): 527 self._display_executing_process(None, test_driver, 528 test_drivers) 529 LOG.info("%s are passed, no need to retry" % module_name) 530 self._append_history_result(task, module_name) 531 LOG.info("") 532 test_drivers.pop(0) 533 continue 534 535 if getattr(task.config, ConfigConst.component_mapper, ""): 536 module_name = test_driver[1].source.module_name 537 self.component_task_setup(task, module_name) 538 539 # get environment 540 try: 541 environment = self.__allocate_environment__( 542 task.config.__dict__, test_driver) 543 except DeviceError as exception: 544 self._handle_device_error(exception, task, test_drivers) 545 continue 546 547 if not Scheduler.is_execute: 548 if environment: 549 Scheduler.__free_environment__(environment) 550 continue 551 552 if check_mode(ModeType.decc) or getattr( 553 task.config, ConfigConst.check_device, False): 554 LOG.info("Start to check environment: %s" % 555 environment.__get_serial__()) 556 status = self._decc_task_setup(environment, task) 557 if not status: 558 Scheduler.__free_environment__(environment) 559 task_unused_env.append(environment) 560 error_message = "Load Error[00116]" 561 self.report_not_executed(task.config.report_path, 562 [test_drivers[0]], 563 error_message, task) 564 test_drivers.pop(0) 565 continue 566 else: 567 LOG.info("Environment %s check success", 568 environment.__get_serial__()) 569 570 # display executing progress 571 self._display_executing_process(environment, test_driver, 572 test_drivers) 573 574 # add to used devices and set need_kit_setup attribute 575 self._append_used_devices(environment, used_devices) 576 577 # start driver thread 578 self._start_driver_thread(current_driver_threads, ( 579 environment, message_queue, task, test_driver)) 580 test_drivers.pop(0) 581 582 # wait for all drivers threads finished and do kit teardown 583 while True: 584 if not queue_monitor_thread.is_alive(): 585 break 586 time.sleep(3) 587 588 self._do_taskkit_teardown(used_devices, task_unused_env) 589 590 @classmethod 591 def _append_history_result(cls, task, module_name): 592 history_report_path = getattr( 593 task.config, ConfigConst.history_report_path, "") 594 from _core.report.result_reporter import ResultReporter 595 params = ResultReporter.get_task_info_params( 596 history_report_path) 597 598 if not params or not params[ReportConst.data_reports]: 599 LOG.debug("Task info record data reports is empty") 600 return 601 602 report_data_dict = dict(params[ReportConst.data_reports]) 603 if module_name not in report_data_dict.keys(): 604 module_name_ = str(module_name).split(".")[0] 605 if module_name_ not in report_data_dict.keys(): 606 LOG.error("%s not in data reports" % module_name) 607 return 608 module_name = module_name_ 609 610 from xdevice import SuiteReporter 611 if check_mode(ModeType.decc): 612 virtual_report_path, report_result = SuiteReporter. \ 613 get_history_result_by_module(module_name) 614 LOG.debug("Append history result: (%s, %s)" % ( 615 virtual_report_path, report_result)) 616 SuiteReporter.append_report_result( 617 (virtual_report_path, report_result)) 618 else: 619 history_execute_result = report_data_dict.get(module_name, "") 620 LOG.info("Start copy %s" % history_execute_result) 621 file_name = get_filename_extension(history_execute_result)[0] 622 if os.path.exists(history_execute_result): 623 result_dir = \ 624 os.path.join(task.config.report_path, "result") 625 os.makedirs(result_dir, exist_ok=True) 626 target_execute_result = "%s.xml" % os.path.join( 627 task.config.report_path, "result", file_name) 628 shutil.copyfile(history_execute_result, target_execute_result) 629 LOG.info("Copy %s to %s" % ( 630 history_execute_result, target_execute_result)) 631 else: 632 error_msg = "Copy failed! %s not exists!" % \ 633 history_execute_result 634 raise ParamError(error_msg) 635 636 def _handle_device_error(self, exception, task, test_drivers): 637 self._display_executing_process(None, test_drivers[0], test_drivers) 638 error_message = "%s: %s" % \ 639 (get_instance_name(exception), exception) 640 LOG.exception(error_message, exc_info=False, 641 error_no=exception.error_no) 642 if check_mode(ModeType.decc): 643 error_message = "Load Error[00104]" 644 self.report_not_executed(task.config.report_path, [test_drivers[0]], 645 error_message, task) 646 647 LOG.info("") 648 test_drivers.pop(0) 649 650 @classmethod 651 def _clear_not_executed(cls, task, test_drivers): 652 if Scheduler.mode != ModeType.decc: 653 # clear all 654 test_drivers.clear() 655 return 656 # The result is reported only in DECC mode, and also clear all. 657 LOG.error("Case no run: task execution terminated!", error_no="00300") 658 error_message = "Execute Terminate[00300]" 659 cls.report_not_executed(task.config.report_path, test_drivers, 660 error_message) 661 test_drivers.clear() 662 663 @classmethod 664 def report_not_executed(cls, report_path, test_drivers, error_message, 665 task=None): 666 # traversing list to get remained elements 667 for test_driver in test_drivers: 668 # get report file 669 if task and getattr(task.config, "testdict", ""): 670 report_file = os.path.join(get_sub_path( 671 test_driver[1].source.source_file), 672 "%s.xml" % test_driver[1].source.test_name) 673 else: 674 report_file = os.path.join( 675 report_path, "result", 676 "%s.xml" % test_driver[1].source.module_name) 677 678 # get report name 679 report_name = test_driver[1].source.test_name if \ 680 not test_driver[1].source.test_name.startswith("{") \ 681 else "report" 682 683 # get module name 684 module_name = test_driver[1].source.module_name 685 686 # here, normally create empty report and then upload result 687 check_result_report(report_path, report_file, error_message, 688 report_name, module_name) 689 690 def _start_driver_thread(self, current_driver_threads, thread_params): 691 environment, message_queue, task, test_driver = thread_params 692 thread_id = self._get_thread_id(current_driver_threads) 693 driver_thread = DriversThread(test_driver, task, environment, 694 message_queue) 695 driver_thread.setDaemon(True) 696 driver_thread.set_thread_id(thread_id) 697 driver_thread.set_listeners(self._create_listeners(task)) 698 driver_thread.start() 699 current_driver_threads.setdefault(thread_id, driver_thread) 700 701 @classmethod 702 def _do_taskkit_teardown(cls, used_devices, task_unused_env): 703 for device in used_devices.values(): 704 if getattr(device, ConfigConst.need_kit_setup, True): 705 continue 706 707 for kit in getattr(device, ConfigConst.task_kits, []): 708 try: 709 kit.__teardown__(device) 710 except Exception as error: 711 LOG.debug("Do task kit teardown: %s" % error) 712 setattr(device, ConfigConst.task_kits, []) 713 setattr(device, ConfigConst.need_kit_setup, True) 714 715 for environment in task_unused_env: 716 for device in environment.devices: 717 setattr(device, ConfigConst.task_state, True) 718 setattr(device, ConfigConst.need_kit_setup, True) 719 720 def _display_executing_process(self, environment, test_driver, 721 test_drivers): 722 source_content = test_driver[1].source.source_file or \ 723 test_driver[1].source.source_string 724 if environment is None: 725 LOG.info("[%d / %d] Executing: %s, Driver: %s" % 726 (self.test_number - len(test_drivers) + 1, 727 self.test_number, source_content, 728 test_driver[1].source.test_type)) 729 return 730 731 LOG.info("[%d / %d] Executing: %s, Device: %s, Driver: %s" % 732 (self.test_number - len(test_drivers) + 1, 733 self.test_number, source_content, 734 environment.__get_serial__(), 735 test_driver[1].source.test_type)) 736 737 @classmethod 738 def _get_thread_id(cls, current_driver_threads): 739 thread_id = get_cst_time().strftime( 740 '%Y-%m-%d-%H-%M-%S-%f') 741 while thread_id in current_driver_threads.keys(): 742 thread_id = get_cst_time().strftime( 743 '%Y-%m-%d-%H-%M-%S-%f') 744 return thread_id 745 746 @classmethod 747 def _append_used_devices(cls, environment, used_devices): 748 if environment is not None: 749 for device in environment.devices: 750 device_serial = device.__get_serial__() if device else "None" 751 if device_serial and device_serial not in used_devices.keys(): 752 used_devices[device_serial] = device 753 754 @staticmethod 755 def _start_queue_monitor(message_queue, test_drivers, 756 current_driver_threads): 757 queue_monitor_thread = QueueMonitorThread(message_queue, 758 current_driver_threads, 759 test_drivers) 760 queue_monitor_thread.setDaemon(True) 761 queue_monitor_thread.start() 762 return queue_monitor_thread 763 764 def exec_command(self, command, options): 765 """ 766 Directly executes a command without adding it to the command queue. 767 """ 768 if command != "run": 769 raise ParamError("unsupported command action: %s" % command, 770 error_no="00100") 771 exec_type = options.exectype 772 if exec_type in [TestExecType.device_test, TestExecType.host_test, 773 TestExecType.host_driven_test]: 774 self._exec_task(options) 775 else: 776 LOG.error("Unsupported execution type '%s'" % exec_type, 777 error_no="00100") 778 779 return 780 781 def _exec_task(self, options): 782 """ 783 Directly allocates a device and execute a device test. 784 """ 785 try: 786 self.check_auto_retry(options) 787 task = self.__discover__(options.__dict__) 788 self.__execute__(task) 789 except (ParamError, ValueError, TypeError, SyntaxError, 790 AttributeError) as exception: 791 error_no = getattr(exception, "error_no", "00000") 792 LOG.exception("%s: %s" % (get_instance_name(exception), exception), 793 exc_info=False, error_no=error_no) 794 if Scheduler.upload_address: 795 Scheduler.upload_unavailable_result(str(exception.args)) 796 Scheduler.upload_report_end() 797 finally: 798 self.stop_task_logcat() 799 self.stop_encrypt_log() 800 self.start_auto_retry() 801 802 @classmethod 803 def _reset_environment(cls, environment="", config_file=""): 804 env_manager = EnvironmentManager() 805 env_manager.env_stop() 806 EnvironmentManager(environment, config_file) 807 808 @classmethod 809 def _restore_environment(cls): 810 env_manager = EnvironmentManager() 811 env_manager.env_stop() 812 EnvironmentManager() 813 814 @classmethod 815 def start_task_log(cls, log_path): 816 tool_file_name = "task_log.log" 817 tool_log_file = os.path.join(log_path, tool_file_name) 818 add_task_file_handler(tool_log_file) 819 820 @classmethod 821 def start_encrypt_log(cls, log_path): 822 from _core.report.encrypt import check_pub_key_exist 823 if check_pub_key_exist(): 824 encrypt_file_name = "task_log.ept" 825 encrypt_log_file = os.path.join(log_path, encrypt_file_name) 826 add_encrypt_file_handler(encrypt_log_file) 827 828 @classmethod 829 def stop_task_logcat(cls): 830 remove_task_file_handler() 831 832 @classmethod 833 def stop_encrypt_log(cls): 834 remove_encrypt_file_handler() 835 836 @staticmethod 837 def _find_test_root_descriptor(config): 838 if getattr(config, ConfigConst.task, None) or \ 839 getattr(config, ConfigConst.testargs, None): 840 Scheduler._pre_component_test(config) 841 842 if getattr(config, ConfigConst.subsystems, "") or \ 843 getattr(config, ConfigConst.parts, "") or \ 844 getattr(config, ConfigConst.component_base_kit, ""): 845 uid = unique_id("Scheduler", "component") 846 if config.subsystems or config.parts: 847 test_set = (config.subsystems, config.parts) 848 else: 849 kit = getattr(config, ConfigConst.component_base_kit) 850 test_set = kit.get_white_list() 851 852 root = Descriptor(uuid=uid, name="component", 853 source=TestSetSource(test_set), 854 container=True) 855 856 root.children = find_test_descriptors(config) 857 return root 858 # read test list from testdict 859 if getattr(config, ConfigConst.testdict, "") != "" and getattr( 860 config, ConfigConst.testfile, "") == "": 861 uid = unique_id("Scheduler", "testdict") 862 root = Descriptor(uuid=uid, name="testdict", 863 source=TestSetSource(config.testdict), 864 container=True) 865 root.children = find_testdict_descriptors(config) 866 return root 867 868 # read test list from testfile, testlist or task 869 test_set = getattr(config, ConfigConst.testfile, "") or getattr( 870 config, ConfigConst.testlist, "") or getattr( 871 config, ConfigConst.task, "") or getattr( 872 config, ConfigConst.testcase) 873 # read test list from testfile, testlist or task 874 test_set = getattr(config, "testfile", "") or getattr( 875 config, "testlist", "") or getattr(config, "task", "") or getattr( 876 config, "testcase") 877 if test_set: 878 fname, _ = get_filename_extension(test_set) 879 uid = unique_id("Scheduler", fname) 880 root = Descriptor(uuid=uid, name=fname, 881 source=TestSetSource(test_set), container=True) 882 root.children = find_test_descriptors(config) 883 return root 884 else: 885 raise ParamError("no test file, list, dict, case or task found", 886 error_no="00102") 887 888 @classmethod 889 def terminate_cmd_exec(cls): 890 Scheduler.is_execute = False 891 Scheduler.auto_retry = -1 892 LOG.info("Start to terminate execution") 893 return Scheduler.terminate_result.get() 894 895 @classmethod 896 def upload_case_result(cls, upload_param): 897 if not Scheduler.upload_address: 898 return 899 case_id, result, error, start_time, end_time, report_path = \ 900 upload_param 901 if error and len(error) > MAX_VISIBLE_LENGTH: 902 error = "%s..." % error[:MAX_VISIBLE_LENGTH] 903 LOG.info( 904 "Get upload params: %s, %s, %s, %s, %s, %s" % ( 905 case_id, result, error, start_time, end_time, report_path)) 906 if Scheduler.proxy is not None: 907 Scheduler.proxy.upload_result(case_id, result, error, start_time, 908 end_time, report_path) 909 else: 910 LOG.debug("There is no proxy, can't upload case result") 911 912 @classmethod 913 def upload_module_result(cls, exec_message): 914 if not Scheduler.is_execute: 915 return 916 result_file = exec_message.get_result() 917 request = exec_message.get_request() 918 919 test_name = request.root.source.test_name 920 if not result_file or not os.path.exists(result_file): 921 LOG.error("%s result not exists", test_name, error_no="00200") 922 return 923 924 test_type = request.root.source.test_type 925 LOG.info("Need upload result: %s, test type: %s" % 926 (result_file, test_type)) 927 upload_params, _, _ = cls._get_upload_params(result_file, request) 928 if not upload_params: 929 LOG.error("%s no test case result to upload" % result_file, 930 error_no="00201") 931 return 932 LOG.info("Need upload %s case" % len(upload_params)) 933 upload_suite = [] 934 for upload_param in upload_params: 935 case_id, result, error, start_time, end_time, report_path = \ 936 upload_param 937 case = {"caseid": case_id, "result": result, "error": error, 938 "start": start_time, "end": end_time, 939 "report": report_path} 940 LOG.info("Case info: %s", case) 941 upload_suite.append(case) 942 if Scheduler.proxy is not None: 943 Scheduler.proxy.upload_batch(upload_suite) 944 else: 945 LOG.debug("There is no proxy, can't upload module result") 946 947 @classmethod 948 def _get_upload_params(cls, result_file, request): 949 upload_params = [] 950 report_path = result_file 951 testsuites_element = DataHelper.parse_data_report(report_path) 952 start_time, end_time = cls._get_time(testsuites_element) 953 if request.get_test_type() == HostDrivenTestType.device_test: 954 for model_element in testsuites_element: 955 case_id = model_element.get(ReportConstant.name, "") 956 case_result, error = cls.get_script_result(model_element) 957 if error and len(error) > MAX_VISIBLE_LENGTH: 958 error = "$s..." % error[:MAX_VISIBLE_LENGTH] 959 upload_params.append( 960 (case_id, case_result, error, start_time, 961 end_time, request.config.report_path,)) 962 else: 963 for testsuite_element in testsuites_element: 964 if check_mode(ModeType.developer): 965 module_name = str(get_filename_extension( 966 report_path)[0]).split(".")[0] 967 else: 968 module_name = testsuite_element.get(ReportConstant.name, 969 "none") 970 for case_element in testsuite_element: 971 case_id = cls._get_case_id(case_element, module_name) 972 case_result, error = cls._get_case_result(case_element) 973 if error and len(error) > MAX_VISIBLE_LENGTH: 974 error = "%s..." % error[:MAX_VISIBLE_LENGTH] 975 if case_result == "Ignored": 976 LOG.info("Get upload params: %s result is ignored", 977 case_id) 978 continue 979 upload_params.append( 980 (case_id, case_result, error, start_time, 981 end_time, request.config.report_path,)) 982 return upload_params, start_time, end_time 983 984 @classmethod 985 def get_script_result(cls, model_element): 986 disabled = int(model_element.get(ReportConstant.disabled)) if \ 987 model_element.get(ReportConstant.disabled, "") else 0 988 failures = int(model_element.get(ReportConstant.failures)) if \ 989 model_element.get(ReportConstant.failures, "") else 0 990 errors = int(model_element.get(ReportConstant.errors)) if \ 991 model_element.get(ReportConstant.errors, "") else 0 992 unavailable = int(model_element.get(ReportConstant.unavailable)) if \ 993 model_element.get(ReportConstant.unavailable, "") else 0 994 if failures > 0 or errors > 0: 995 result = "Failed" 996 elif disabled > 0 or unavailable > 0: 997 result = "Unavailable" 998 else: 999 result = "Passed" 1000 1001 if result == "Passed": 1002 return result, "" 1003 if Scheduler.mode == ModeType.decc: 1004 result = "Failed" 1005 1006 error_msg = model_element.get(ReportConstant.message, "") 1007 if not error_msg and len(model_element) > 0: 1008 error_msg = model_element[0].get(ReportConstant.message, "") 1009 if not error_msg and len(model_element[0]) > 0: 1010 error_msg = model_element[0][0].get(ReportConstant.message, "") 1011 return result, error_msg 1012 1013 @classmethod 1014 def _get_case_id(cls, case_element, package_name): 1015 class_name = case_element.get(ReportConstant.class_name, "none") 1016 method_name = case_element.get(ReportConstant.name, "none") 1017 case_id = "{}#{}#{}#{}".format(Scheduler.task_name, package_name, 1018 class_name, method_name) 1019 return case_id 1020 1021 @classmethod 1022 def _get_case_result(cls, case_element): 1023 # get result 1024 case = Case() 1025 case.status = case_element.get(ReportConstant.status, "") 1026 case.result = case_element.get(ReportConstant.result, "") 1027 if case_element.get(ReportConstant.message, ""): 1028 case.message = case_element.get(ReportConstant.message) 1029 if len(case_element) > 0: 1030 if not case.result: 1031 case.result = ReportConstant.false 1032 case.message = case_element[0].get(ReportConstant.message) 1033 if case.is_passed(): 1034 result = "Passed" 1035 elif case.is_failed(): 1036 result = "Failed" 1037 elif case.is_blocked(): 1038 result = "Blocked" 1039 elif case.is_ignored(): 1040 result = "Ignored" 1041 else: 1042 result = "Unavailable" 1043 return result, case.message 1044 1045 @classmethod 1046 def _get_time(cls, testsuite_element): 1047 start_time = testsuite_element.get(ReportConstant.start_time, "") 1048 end_time = testsuite_element.get(ReportConstant.end_time, "") 1049 try: 1050 if start_time and end_time: 1051 start_time = int(time.mktime(time.strptime( 1052 start_time, ReportConstant.time_format)) * 1000) 1053 end_time = int(time.mktime(time.strptime( 1054 end_time, ReportConstant.time_format)) * 1000) 1055 else: 1056 timestamp = str(testsuite_element.get( 1057 ReportConstant.time_stamp, "")).replace("T", " ") 1058 cost_time = testsuite_element.get(ReportConstant.time, "") 1059 if timestamp and cost_time: 1060 try: 1061 end_time = int(time.mktime(time.strptime( 1062 timestamp, ReportConstant.time_format)) * 1000) 1063 except ArithmeticError as error: 1064 LOG.error("Get time error %s" % error) 1065 end_time = int(time.time() * 1000) 1066 start_time = int(end_time - float(cost_time) * 1000) 1067 else: 1068 current_time = int(time.time() * 1000) 1069 start_time, end_time = current_time, current_time 1070 except ArithmeticError as error: 1071 LOG.error("Get time error %s" % error) 1072 current_time = int(time.time() * 1000) 1073 start_time, end_time = current_time, current_time 1074 return start_time, end_time 1075 1076 @classmethod 1077 def upload_task_result(cls, task, error_message=""): 1078 if not Scheduler.task_name: 1079 LOG.info("No need upload summary report") 1080 return 1081 1082 summary_data_report = os.path.join(task.config.report_path, 1083 ReportConstant.summary_data_report) 1084 if not os.path.exists(summary_data_report): 1085 Scheduler.upload_unavailable_result(str( 1086 error_message) or "summary report not exists", 1087 task.config.report_path) 1088 return 1089 1090 task_element = ElementTree.parse(summary_data_report).getroot() 1091 start_time, end_time = cls._get_time(task_element) 1092 task_result = cls._get_task_result(task_element) 1093 error_msg = "" 1094 for child in task_element: 1095 if child.get(ReportConstant.message, ""): 1096 error_msg = "{}{}".format( 1097 error_msg, "%s;" % child.get(ReportConstant.message)) 1098 if error_msg: 1099 error_msg = error_msg[:-1] 1100 cls.upload_case_result((Scheduler.task_name, task_result, 1101 error_msg, start_time, end_time, 1102 task.config.report_path)) 1103 1104 @classmethod 1105 def _get_task_result(cls, task_element): 1106 failures = int(task_element.get(ReportConstant.failures, 0)) 1107 errors = int(task_element.get(ReportConstant.errors, 0)) 1108 disabled = int(task_element.get(ReportConstant.disabled, 0)) 1109 unavailable = int(task_element.get(ReportConstant.unavailable, 0)) 1110 if disabled > 0: 1111 task_result = "Blocked" 1112 elif errors > 0 or failures > 0: 1113 task_result = "Failed" 1114 elif unavailable > 0: 1115 task_result = "Unavailable" 1116 else: 1117 task_result = "Passed" 1118 return task_result 1119 1120 @classmethod 1121 def upload_unavailable_result(cls, error_msg, report_path=""): 1122 start_time = int(time.time() * 1000) 1123 Scheduler.upload_case_result((Scheduler.task_name, "Unavailable", 1124 error_msg, start_time, start_time, 1125 report_path)) 1126 1127 @classmethod 1128 def upload_report_end(cls): 1129 if getattr(cls, "tmp_json", None): 1130 os.remove(cls.tmp_json) 1131 del cls.tmp_json 1132 LOG.info("Upload report end") 1133 if Scheduler.proxy is not None: 1134 Scheduler.proxy.report_end() 1135 else: 1136 LOG.debug("There is no proxy, can't upload report end") 1137 1138 @classmethod 1139 def is_module_need_retry(cls, task, module_name): 1140 failed_flag = False 1141 if check_mode(ModeType.decc): 1142 from xdevice import SuiteReporter 1143 for module, failed in SuiteReporter.get_failed_case_list(): 1144 if module_name == module or str(module_name).split( 1145 ".")[0] == module: 1146 failed_flag = True 1147 break 1148 else: 1149 from xdevice import ResultReporter 1150 history_report_path = \ 1151 getattr(task.config, ConfigConst.history_report_path, "") 1152 params = ResultReporter.get_task_info_params(history_report_path) 1153 if params and params[ReportConst.unsuccessful_params]: 1154 if dict(params[ReportConst.unsuccessful_params]).get(module_name, []): 1155 failed_flag = True 1156 elif dict(params[ReportConst.unsuccessful_params]).get(str(module_name).split(".")[0], []): 1157 failed_flag = True 1158 return failed_flag 1159 1160 @classmethod 1161 def compare_spt_time(cls, kit_spt, device_spt): 1162 if not kit_spt or not device_spt: 1163 return False 1164 try: 1165 kit_time = str(kit_spt).split("-")[:2] 1166 device_time = str(device_spt).split("-")[:2] 1167 k_spt = datetime.datetime.strptime( 1168 "-".join(kit_time), "%Y-%m") 1169 d_spt = datetime.datetime.strptime("-".join(device_time), "%Y-%m") 1170 except ValueError as value_error: 1171 LOG.debug("Date format is error, %s" % value_error.args) 1172 return False 1173 month_interval = int(k_spt.month) - int(d_spt.month) 1174 year_interval = int(k_spt.year) - int(d_spt.year) 1175 LOG.debug("Kit spt (year=%s, month=%s), device spt (year=%s, month=%s)" 1176 % (k_spt.year, k_spt.month, d_spt.year, d_spt.month)) 1177 if year_interval < 0: 1178 return True 1179 if year_interval == 0 and month_interval in range(-11, 3): 1180 return True 1181 if year_interval == 1 and month_interval + 12 in (1, 2): 1182 return True 1183 1184 @classmethod 1185 def _parse_property_value(cls, property_name, driver_request, kit): 1186 test_args = copy.deepcopy( 1187 driver_request.config.get(ConfigConst.testargs, dict())) 1188 property_value = "" 1189 if ConfigConst.pass_through in test_args.keys(): 1190 import json 1191 pt_dict = json.loads(test_args.get(ConfigConst.pass_through, "")) 1192 property_value = pt_dict.get(property_name, None) 1193 elif property_name in test_args.keys: 1194 property_value = test_args.get(property_name, None) 1195 return property_value if property_value else \ 1196 kit.properties.get(property_name, None) 1197 1198 @classmethod 1199 def _calculate_device_options(cls, device_options, environment_config, 1200 options, test_source): 1201 # calculate difference 1202 diff_value = len(environment_config) - len(device_options) 1203 if device_options and diff_value == 0: 1204 return device_options 1205 1206 else: 1207 diff_value = diff_value if diff_value else 1 1208 if str(test_source.source_file).endswith(".bin"): 1209 device_option = DeviceSelectionOption( 1210 options, DeviceLabelType.ipcamera, test_source) 1211 else: 1212 device_option = DeviceSelectionOption( 1213 options, None, test_source) 1214 1215 device_option.source_file = \ 1216 test_source.source_file or test_source.source_string 1217 device_option.required_manager = "device" 1218 device_options.extend([device_option] * diff_value) 1219 LOG.debug("Assign device options and it's length is %s" 1220 % len(device_options)) 1221 return device_options 1222 1223 @classmethod 1224 def update_test_type_in_source(cls, key, value): 1225 LOG.debug("update test type dict in source") 1226 TestDictSource.test_type[key] = value 1227 1228 @classmethod 1229 def update_ext_type_in_source(cls, key, value): 1230 LOG.debug("update ext type dict in source") 1231 TestDictSource.exe_type[key] = value 1232 1233 @classmethod 1234 def clear_test_dict_source(cls): 1235 TestDictSource.clear() 1236 1237 @classmethod 1238 def reset_test_dict_source(cls): 1239 TestDictSource.reset() 1240 1241 @classmethod 1242 def _pre_component_test(cls, config): 1243 if not config.kits: 1244 return 1245 cur_kit = None 1246 for kit in config.kits: 1247 if kit.__class__.__name__ == CKit.component: 1248 cur_kit = kit 1249 break 1250 if not cur_kit: 1251 return 1252 get_white_list = getattr(cur_kit, "get_white_list", None) 1253 if not callable(get_white_list): 1254 return 1255 subsystems, parts = get_white_list() 1256 if not subsystems and not parts: 1257 return 1258 setattr(config, ConfigConst.component_base_kit, cur_kit) 1259 1260 @classmethod 1261 def component_task_setup(cls, task, module_name): 1262 component_kit = task.config.get(ConfigConst.component_base_kit, None) 1263 if not component_kit: 1264 # only -p -s .you do not care about the components that can be 1265 # supported. you only want to run the use cases of the current 1266 # component 1267 return 1268 LOG.debug("Start component task setup") 1269 _component_mapper = task.config.get(ConfigConst.component_mapper) 1270 _subsystem, _part = _component_mapper.get(module_name) 1271 1272 is_hit = False 1273 # find in cache. if not find, update cache 1274 cache_subsystem, cache_part = component_kit.get_cache() 1275 if _subsystem in cache_subsystem or _part in cache_subsystem: 1276 is_hit = True 1277 if not is_hit: 1278 env_manager = EnvironmentManager() 1279 for _, manager in env_manager.managers.items(): 1280 if getattr(manager, "devices_list", []): 1281 for device in manager.devices_list: 1282 component_kit.__setup__(device) 1283 cache_subsystem, cache_part = component_kit.get_cache() 1284 if _subsystem in cache_subsystem or _part in cache_subsystem: 1285 is_hit = True 1286 if not is_hit: 1287 LOG.warning("%s are skipped, no suitable component found. " 1288 "Require subsystem=%s part=%s, no device match this" 1289 % (module_name, _subsystem, _part)) 1290 1291 @classmethod 1292 def start_auto_retry(cls): 1293 if not Scheduler.is_need_auto_retry: 1294 Scheduler.auto_retry = -1 1295 LOG.debug("No need auto retry") 1296 return 1297 if Scheduler.auto_retry > 0: 1298 Scheduler.auto_retry -= 1 1299 if Scheduler.auto_retry == 0: 1300 Scheduler.auto_retry = -1 1301 from _core.command.console import Console 1302 console = Console() 1303 console.command_parser("run --retry") 1304 1305 @classmethod 1306 def check_auto_retry(cls, options): 1307 if Scheduler.auto_retry < 0 and int(getattr(options, ConfigConst.auto_retry, 0)) > 0: 1308 value = int(getattr(options, ConfigConst.auto_retry, 0)) 1309 Scheduler.auto_retry = value if value <= 10 else 10 1310