1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import copy 20import datetime 21import os 22import queue 23import time 24import uuid 25from xml.etree import ElementTree 26 27from _core.utils import unique_id 28from _core.utils import check_mode 29from _core.utils import get_sub_path 30from _core.utils import get_filename_extension 31from _core.utils import convert_serial 32from _core.utils import get_instance_name 33from _core.utils import is_config_str 34from _core.utils import check_result_report 35from _core.environment.manager_env import EnvironmentManager 36from _core.environment.manager_env import DeviceSelectionOption 37from _core.exception import ParamError 38from _core.exception import ExecuteTerminate 39from _core.exception import LiteDeviceError 40from _core.exception import DeviceError 41from _core.interface import LifeCycle 42from _core.executor.request import Request 43from _core.executor.request import Task 44from _core.executor.request import Descriptor 45from _core.plugin import get_plugin 46from _core.plugin import Plugin 47from _core.plugin import Config 48from _core.report.reporter_helper import ExecInfo 49from _core.report.reporter_helper import ReportConstant 50from _core.report.reporter_helper import Case 51from _core.report.reporter_helper import DataHelper 52from _core.constants import TestExecType 53from _core.constants import CKit 54from _core.constants import ModeType 55from _core.constants import DeviceLabelType 56from _core.constants import SchedulerType 57from _core.constants import ListenerType 58from _core.constants import ConfigConst 59from _core.executor.concurrent import DriversThread 60from _core.executor.concurrent import QueueMonitorThread 61from _core.executor.source import TestSetSource 62from _core.executor.source import find_test_descriptors 63from _core.executor.source import find_testdict_descriptors 64from _core.executor.source import TestDictSource 65from _core.logger import platform_logger 66from _core.logger import add_task_file_handler 67from _core.logger import remove_task_file_handler 68from _core.logger import add_encrypt_file_handler 69from _core.logger import remove_encrypt_file_handler 70 71__all__ = ["Scheduler"] 72LOG = platform_logger("Scheduler") 73 74 75@Plugin(type=Plugin.SCHEDULER, id=SchedulerType.scheduler) 76class Scheduler(object): 77 """ 78 The Scheduler is the main entry point for client code that wishes to 79 discover and execute tests. 80 """ 81 # factory params 82 is_execute = True 83 terminate_result = queue.Queue() 84 upload_address = "" 85 task_type = "" 86 task_name = "" 87 mode = "" 88 proxy = None 89 90 # command_queue to store test commands 91 command_queue = [] 92 max_command_num = 50 93 # the number of tests in current task 94 test_number = 0 95 96 def __discover__(self, args): 97 """discover task to execute""" 98 config = Config() 99 config.update(args) 100 task = Task(drivers=[]) 101 task.init(config) 102 103 TestDictSource.reset() 104 root_descriptor = self._find_test_root_descriptor(task.config) 105 task.set_root_descriptor(root_descriptor) 106 return task 107 108 def __execute__(self, task): 109 error_message = "" 110 try: 111 Scheduler.is_execute = True 112 if Scheduler.command_queue: 113 LOG.debug("Run command: %s" % Scheduler.command_queue[-1]) 114 run_command = Scheduler.command_queue.pop() 115 task_id = str(uuid.uuid1()).split("-")[0] 116 Scheduler.command_queue.append((task_id, run_command, 117 task.config.report_path)) 118 if len(Scheduler.command_queue) > self.max_command_num: 119 Scheduler.command_queue.pop(0) 120 121 if getattr(task.config, ConfigConst.test_environment, ""): 122 self._reset_environment(task.config.get( 123 ConfigConst.test_environment, "")) 124 elif getattr(task.config, ConfigConst.configfile, ""): 125 self._reset_environment(config_file=task.config.get( 126 ConfigConst.configfile, "")) 127 128 # do with the count of repeat about a task 129 if getattr(task.config, ConfigConst.repeat, 0) > 0: 130 drivers_list = list() 131 for repeat_index in range(task.config.repeat): 132 for driver_index in range(len(task.test_drivers)): 133 drivers_list.append( 134 copy.deepcopy(task.test_drivers[driver_index])) 135 task.test_drivers = drivers_list 136 137 self.test_number = len(task.test_drivers) 138 139 if task.config.exectype == TestExecType.device_test: 140 self._device_test_execute(task) 141 elif task.config.exectype == TestExecType.host_test: 142 self._host_test_execute(task) 143 else: 144 LOG.info("exec type %s is bypassed" % task.config.exectype) 145 146 except (ParamError, ValueError, TypeError, SyntaxError, AttributeError, 147 DeviceError, LiteDeviceError, ExecuteTerminate) as exception: 148 error_no = getattr(exception, "error_no", "") 149 error_message = "%s[%s]" % (str(exception), error_no) \ 150 if error_no else str(exception) 151 error_no = error_no if error_no else "00000" 152 LOG.exception(exception, exc_info=True, error_no=error_no) 153 154 finally: 155 Scheduler._clear_test_dict_source() 156 if getattr(task.config, ConfigConst.test_environment, "") or \ 157 getattr(task.config, ConfigConst.configfile, ""): 158 self._restore_environment() 159 160 if Scheduler.upload_address: 161 Scheduler.upload_task_result(task, error_message) 162 Scheduler.upload_report_end() 163 164 def _device_test_execute(self, task): 165 used_devices = {} 166 try: 167 self._dynamic_concurrent_execute(task, used_devices) 168 finally: 169 # generate reports 170 self._generate_task_report(task, used_devices) 171 172 def _host_test_execute(self, task): 173 """execute host test""" 174 try: 175 # initial params 176 current_driver_threads = {} 177 test_drivers = task.test_drivers 178 message_queue = queue.Queue() 179 180 # execute test drivers 181 queue_monitor_thread = self._start_queue_monitor( 182 message_queue, test_drivers, current_driver_threads) 183 while test_drivers: 184 if len(current_driver_threads) > 5: 185 time.sleep(3) 186 continue 187 188 # clear remaining test drivers when scheduler is terminated 189 if not Scheduler.is_execute: 190 LOG.info("clear test drivers") 191 self._clear_not_executed(task, test_drivers) 192 break 193 194 # get test driver and device 195 test_driver = test_drivers[0] 196 197 # display executing progress 198 self._display_executing_process(None, test_driver, 199 test_drivers) 200 201 # start driver thread 202 self._start_driver_thread(current_driver_threads, ( 203 None, message_queue, task, test_driver)) 204 test_drivers.pop(0) 205 206 # wait for all drivers threads finished and do kit teardown 207 while True: 208 if not queue_monitor_thread.is_alive(): 209 break 210 time.sleep(3) 211 212 finally: 213 # generate reports 214 self._generate_task_report(task) 215 216 def _generate_task_report(self, task, used_devices=None): 217 task_info = ExecInfo() 218 test_type = getattr(task.config, "testtype", []) 219 task_name = getattr(task.config, "task", "") 220 if task_name: 221 task_info.test_type = str(task_name).upper() 222 else: 223 task_info.test_type = ",".join(test_type) if test_type else "Test" 224 if used_devices: 225 serials = [] 226 platforms = [] 227 for serial, device in used_devices.items(): 228 serials.append(convert_serial(serial)) 229 platform = str(device.label).capitalize() 230 if platform not in platforms: 231 platforms.append(platform) 232 task_info.device_name = ",".join(serials) 233 task_info.platform = ",".join(platforms) 234 else: 235 task_info.device_name = "None" 236 task_info.platform = "None" 237 task_info.test_time = task.config.start_time 238 task_info.product_info = getattr(task, "product_info", "") 239 240 listeners = self._create_listeners(task) 241 for listener in listeners: 242 listener.__ended__(LifeCycle.TestTask, task_info, 243 test_type=task_info.test_type) 244 245 @classmethod 246 def _create_listeners(cls, task): 247 listeners = [] 248 # append log listeners 249 log_listeners = get_plugin(Plugin.LISTENER, ListenerType.log) 250 for log_listener in log_listeners: 251 log_listener_instance = log_listener.__class__() 252 listeners.append(log_listener_instance) 253 # append report listeners 254 report_listeners = get_plugin(Plugin.LISTENER, ListenerType.report) 255 for report_listener in report_listeners: 256 report_listener_instance = report_listener.__class__() 257 setattr(report_listener_instance, "report_path", 258 task.config.report_path) 259 listeners.append(report_listener_instance) 260 # append upload listeners 261 upload_listeners = get_plugin(Plugin.LISTENER, ListenerType.upload) 262 for upload_listener in upload_listeners: 263 upload_listener_instance = upload_listener.__class__() 264 listeners.append(upload_listener_instance) 265 return listeners 266 267 @staticmethod 268 def _find_device_options(environment_config, options, test_source): 269 devices_option = [] 270 for device_dict in environment_config: 271 label = device_dict.get("label", "") 272 if not label: 273 continue 274 device_option = DeviceSelectionOption(options, label, test_source) 275 device_dict.pop("type", None) 276 device_dict.pop("label", None) 277 device_option.extend_value = device_dict 278 device_option.source_file = test_source.config_file or \ 279 test_source.source_string 280 devices_option.append(device_option) 281 return devices_option 282 283 def __allocate_environment__(self, options, test_driver): 284 device_options = self.get_device_options(options, 285 test_driver[1].source) 286 environment = None 287 env_manager = EnvironmentManager() 288 while True: 289 if not Scheduler.is_execute: 290 break 291 environment = env_manager.apply_environment(device_options) 292 if len(environment.devices) == len(device_options): 293 return environment 294 else: 295 env_manager.release_environment(environment) 296 LOG.debug("'%s' is waiting available device", 297 test_driver[1].source.test_name) 298 if env_manager.check_device_exist(device_options): 299 continue 300 else: 301 raise DeviceError("The '%s' required device does not exist" 302 % test_driver[1].source.source_file, 303 error_no="00104") 304 305 return environment 306 307 @classmethod 308 def get_device_options(cls, options, test_source): 309 device_options = [] 310 config_file = test_source.config_file 311 312 from _core.testkit.json_parser import JsonParser 313 if test_source.source_string and is_config_str( 314 test_source.source_string): 315 json_config = JsonParser(test_source.source_string) 316 device_options = cls._find_device_options( 317 json_config.get_environment(), options, test_source) 318 elif config_file and os.path.exists(config_file): 319 json_config = JsonParser(test_source.config_file) 320 device_options = cls._find_device_options( 321 json_config.get_environment(), options, test_source) 322 323 if not device_options: 324 if str(test_source.source_file).endswith(".bin"): 325 device_option = DeviceSelectionOption( 326 options, DeviceLabelType.ipcamera, test_source) 327 else: 328 device_option = DeviceSelectionOption( 329 options, None, test_source) 330 device_option.source_file = test_source.source_file or \ 331 test_source.source_string 332 device_options.append(device_option) 333 334 if ConfigConst.component_mapper in options.keys(): 335 required_component = options.get(ConfigConst.component_mapper). \ 336 get(test_source.module_name, None) 337 for device_option in device_options: 338 device_option.required_component = required_component 339 340 return device_options 341 342 @staticmethod 343 def __free_environment__(environment): 344 env_manager = EnvironmentManager() 345 env_manager.release_environment(environment) 346 347 def _check_device_spt(self, kit, driver_request, device): 348 kit_version = self._parse_version_id(driver_request, kit) 349 kit_spt = kit.properties.get(ConfigConst.spt, None) 350 if not kit_spt or not kit_version: 351 setattr(device, ConfigConst.task_state, False) 352 LOG.error("spt or version is empty", error_no="00108") 353 return 354 if getattr(driver_request, ConfigConst.product_info, ""): 355 product_info = getattr(driver_request, 356 ConfigConst.product_info) 357 if not isinstance(product_info, dict): 358 LOG.warning("product info should be dict, %s", 359 product_info) 360 setattr(device, ConfigConst.task_state, False) 361 return 362 device_spt = product_info.get("Security Patch", None) 363 device_version = product_info.get("VersionID", None) 364 if not device_version or not device_version == kit_version: 365 LOG.error("The device %s VersionID is %s, " 366 "and the test case version is %s, " 367 "which does not meet the requirements" % 368 (device.device_sn, device_version, kit_version), 369 error_no="00116") 370 setattr(device, ConfigConst.task_state, False) 371 return 372 373 if not device_spt or not \ 374 Scheduler.compare_spt_time(kit_spt, device_spt): 375 LOG.error("The device %s spt is %s, " 376 "and the test case spt is %s, " 377 "which does not meet the requirements" % 378 (device.device_sn, device_spt, kit_spt), 379 error_no="00116") 380 381 setattr(device, ConfigConst.task_state, False) 382 return 383 384 def _decc_task_setup(self, environment, task): 385 config = Config() 386 config.update(task.config.__dict__) 387 config.environment = environment 388 driver_request = Request(config=config) 389 390 if environment is None: 391 return False 392 393 for device in environment.devices: 394 if not getattr(device, ConfigConst.need_kit_setup, True): 395 LOG.debug("device %s need kit setup is false" % device) 396 continue 397 398 # do task setup for device 399 kits_copy = copy.deepcopy(task.config.kits) 400 setattr(device, ConfigConst.task_kits, kits_copy) 401 for kit in getattr(device, ConfigConst.task_kits, []): 402 if not Scheduler.is_execute: 403 break 404 try: 405 kit.__setup__(device, request=driver_request) 406 except (ParamError, ExecuteTerminate, DeviceError, 407 LiteDeviceError, ValueError, TypeError, 408 SyntaxError, AttributeError) as exception: 409 error_no = getattr(exception, "error_no", "00000") 410 LOG.exception( 411 "task setup device: %s, exception: %s" % ( 412 environment.__get_serial__(), 413 exception), exc_info=False, error_no=error_no) 414 if kit.__class__.__name__ == CKit.query: 415 self._check_device_spt(kit, driver_request, device) 416 LOG.debug("set device %s need kit setup to false" % device) 417 setattr(device, ConfigConst.need_kit_setup, False) 418 419 for device in environment.devices: 420 if not getattr(device, ConfigConst.task_state, True): 421 return False 422 423 # set product_info to self.task 424 if getattr(driver_request, ConfigConst.product_info, "") and \ 425 not getattr(task, ConfigConst.product_info, ""): 426 product_info = getattr(driver_request, ConfigConst.product_info) 427 if not isinstance(product_info, dict): 428 LOG.warning("product info should be dict, %s", 429 product_info) 430 else: 431 setattr(task, ConfigConst.product_info, product_info) 432 return True 433 434 def _dynamic_concurrent_execute(self, task, used_devices): 435 # initial params 436 current_driver_threads = {} 437 test_drivers = task.test_drivers 438 message_queue = queue.Queue() 439 task_unused_env = [] 440 441 # execute test drivers 442 queue_monitor_thread = self._start_queue_monitor( 443 message_queue, test_drivers, current_driver_threads) 444 while test_drivers: 445 # clear remaining test drivers when scheduler is terminated 446 if not Scheduler.is_execute: 447 LOG.info("clear test drivers") 448 self._clear_not_executed(task, test_drivers) 449 break 450 451 # get test driver and device 452 test_driver = test_drivers[0] 453 454 if getattr(task.config, ConfigConst.history_report_path, ""): 455 module_name = test_driver[1].source.module_name 456 if not self.is_module_need_retry(task, module_name): 457 self._display_executing_process(None, test_driver, 458 test_drivers) 459 LOG.info("%s are passed, no need to retry" % module_name) 460 self._append_history_result(task, module_name) 461 LOG.info("") 462 test_drivers.pop(0) 463 continue 464 465 if getattr(task.config, ConfigConst.component_mapper, ""): 466 module_name = test_driver[1].source.module_name 467 self.component_task_setup(task, module_name) 468 469 # get environment 470 try: 471 environment = self.__allocate_environment__( 472 task.config.__dict__, test_driver) 473 except DeviceError as exception: 474 self._handle_device_error(exception, task, test_drivers) 475 continue 476 477 if not Scheduler.is_execute: 478 if environment: 479 Scheduler.__free_environment__(environment) 480 continue 481 482 if check_mode(ModeType.decc) or getattr( 483 task.config, ConfigConst.check_device, False): 484 LOG.info("start to check environment: %s" % 485 environment.__get_serial__()) 486 status = self._decc_task_setup(environment, task) 487 if not status: 488 Scheduler.__free_environment__(environment) 489 task_unused_env.append(environment) 490 error_message = "Load Error[00116]" 491 self.report_not_executed(task.config.report_path, 492 [test_drivers[0]], 493 error_message, task) 494 test_drivers.pop(0) 495 continue 496 else: 497 LOG.info("environment %s check success", 498 environment.__get_serial__()) 499 500 # display executing progress 501 self._display_executing_process(environment, test_driver, 502 test_drivers) 503 504 # add to used devices and set need_kit_setup attribute 505 self._append_used_devices(environment, used_devices) 506 507 # start driver thread 508 self._start_driver_thread(current_driver_threads, ( 509 environment, message_queue, task, test_driver)) 510 test_drivers.pop(0) 511 512 # wait for all drivers threads finished and do kit teardown 513 while True: 514 if not queue_monitor_thread.is_alive(): 515 break 516 time.sleep(3) 517 518 self._do_taskkit_teardown(used_devices, task_unused_env) 519 520 @classmethod 521 def _append_history_result(cls, task, module_name): 522 history_report_path = getattr( 523 task.config, ConfigConst.history_report_path, "") 524 from _core.report.result_reporter import ResultReporter 525 params = ResultReporter.get_task_info_params( 526 history_report_path) 527 528 if not params or not params[4]: 529 LOG.debug("task info record data reports is empty") 530 return 531 532 report_data_dict = dict(params[4]) 533 if module_name not in report_data_dict.keys(): 534 module_name_ = str(module_name).split(".")[0] 535 if module_name_ not in report_data_dict.keys(): 536 LOG.error("%s not in data reports" % module_name) 537 return 538 module_name = module_name_ 539 540 from xdevice import SuiteReporter 541 if check_mode(ModeType.decc): 542 virtual_report_path, report_result = SuiteReporter. \ 543 get_history_result_by_module(module_name) 544 LOG.debug("append history result: (%s, %s)" % ( 545 virtual_report_path, report_result)) 546 SuiteReporter.append_report_result( 547 (virtual_report_path, report_result)) 548 else: 549 import shutil 550 history_execute_result = report_data_dict.get(module_name, "") 551 LOG.info("start copy %s" % history_execute_result) 552 file_name = get_filename_extension(history_execute_result)[0] 553 if os.path.exists(history_execute_result): 554 result_dir = \ 555 os.path.join(task.config.report_path, "result") 556 os.makedirs(result_dir, exist_ok=True) 557 target_execute_result = "%s.xml" % os.path.join( 558 task.config.report_path, "result", file_name) 559 shutil.copyfile(history_execute_result, target_execute_result) 560 LOG.info("copy %s to %s" % ( 561 history_execute_result, target_execute_result)) 562 else: 563 error_msg = "copy failed! %s not exists!" % \ 564 history_execute_result 565 raise ParamError(error_msg) 566 567 def _handle_device_error(self, exception, task, test_drivers): 568 self._display_executing_process(None, test_drivers[0], test_drivers) 569 error_message = "%s: %s" % \ 570 (get_instance_name(exception), exception) 571 LOG.exception(error_message, exc_info=False, 572 error_no=exception.error_no) 573 if check_mode(ModeType.decc): 574 error_message = "Load Error[00104]" 575 self.report_not_executed(task.config.report_path, [test_drivers[0]], 576 error_message, task) 577 578 LOG.info("") 579 test_drivers.pop(0) 580 581 @classmethod 582 def _clear_not_executed(cls, task, test_drivers): 583 if Scheduler.mode != ModeType.decc: 584 # clear all 585 test_drivers.clear() 586 return 587 # The result is reported only in DECC mode, and also clear all. 588 LOG.error("case no run: task execution terminated!", error_no="00300") 589 error_message = "Execute Terminate[00300]" 590 cls.report_not_executed(task.config.report_path, test_drivers, 591 error_message) 592 test_drivers.clear() 593 594 @classmethod 595 def report_not_executed(cls, report_path, test_drivers, error_message, 596 task=None): 597 # traversing list to get remained elements 598 for test_driver in test_drivers: 599 # get report file 600 if task and getattr(task.config, "testdict", ""): 601 report_file = os.path.join(get_sub_path( 602 test_driver[1].source.source_file), 603 "%s.xml" % test_driver[1].source.test_name) 604 else: 605 report_file = os.path.join( 606 report_path, "result", 607 "%s.xml" % test_driver[1].source.module_name) 608 609 # get report name 610 report_name = test_driver[1].source.test_name if \ 611 not test_driver[1].source.test_name.startswith("{") \ 612 else "report" 613 614 # get module name 615 module_name = test_driver[1].source.module_name 616 617 # here, normally create empty report and then upload result 618 check_result_report(report_path, report_file, error_message, 619 report_name, module_name) 620 621 def _start_driver_thread(self, current_driver_threads, thread_params): 622 environment, message_queue, task, test_driver = thread_params 623 thread_id = self._get_thread_id(current_driver_threads) 624 driver_thread = DriversThread(test_driver, task, environment, 625 message_queue) 626 driver_thread.setDaemon(True) 627 driver_thread.set_thread_id(thread_id) 628 driver_thread.set_listeners(self._create_listeners(task)) 629 driver_thread.start() 630 current_driver_threads.setdefault(thread_id, driver_thread) 631 632 @classmethod 633 def _do_taskkit_teardown(cls, used_devices, task_unused_env): 634 for device in used_devices.values(): 635 if getattr(device, ConfigConst.need_kit_setup, True): 636 continue 637 638 for kit in getattr(device, ConfigConst.task_kits, []): 639 try: 640 kit.__teardown__(device) 641 except Exception as error: 642 LOG.debug("do taskkit teardown: %s" % error) 643 setattr(device, ConfigConst.task_kits, []) 644 setattr(device, ConfigConst.need_kit_setup, True) 645 646 for environment in task_unused_env: 647 for device in environment.devices: 648 setattr(device, ConfigConst.task_state, True) 649 setattr(device, ConfigConst.need_kit_setup, True) 650 651 def _display_executing_process(self, environment, test_driver, 652 test_drivers): 653 source_content = test_driver[1].source.source_file or \ 654 test_driver[1].source.source_string 655 if environment is None: 656 LOG.info("[%d / %d] Executing: %s, Driver: %s" % 657 (self.test_number - len(test_drivers) + 1, 658 self.test_number, source_content, 659 test_driver[1].source.test_type)) 660 return 661 662 LOG.info("[%d / %d] Executing: %s, Device: %s, Driver: %s" % 663 (self.test_number - len(test_drivers) + 1, 664 self.test_number, source_content, 665 environment.__get_serial__(), 666 test_driver[1].source.test_type)) 667 668 @classmethod 669 def _get_thread_id(cls, current_driver_threads): 670 thread_id = datetime.datetime.now().strftime( 671 '%Y-%m-%d-%H-%M-%S-%f') 672 while thread_id in current_driver_threads.keys(): 673 thread_id = datetime.datetime.now().strftime( 674 '%Y-%m-%d-%H-%M-%S-%f') 675 return thread_id 676 677 @classmethod 678 def _append_used_devices(cls, environment, used_devices): 679 for device in environment.devices: 680 device_serial = device.__get_serial__() if device else "None" 681 if device_serial and device_serial not in used_devices.keys(): 682 used_devices[device_serial] = device 683 684 @staticmethod 685 def _start_queue_monitor(message_queue, test_drivers, 686 current_driver_threads): 687 queue_monitor_thread = QueueMonitorThread(message_queue, 688 current_driver_threads, 689 test_drivers) 690 queue_monitor_thread.setDaemon(True) 691 queue_monitor_thread.start() 692 return queue_monitor_thread 693 694 def exec_command(self, command, options): 695 """ 696 Directly executes a command without adding it to the command queue. 697 """ 698 if command != "run": 699 raise ParamError("unsupported command action: %s" % command, 700 error_no="00100") 701 exec_type = options.exectype 702 if exec_type in [TestExecType.device_test, TestExecType.host_test, 703 TestExecType.host_driven_test]: 704 self._exec_task(options) 705 else: 706 LOG.error("unsupported execution type '%s'" % exec_type, 707 error_no="00100") 708 709 return 710 711 def _exec_task(self, options): 712 """ 713 Directly allocates a device and execute a device test. 714 """ 715 try: 716 task = self.__discover__(options.__dict__) 717 self.__execute__(task) 718 except (ParamError, ValueError, TypeError, SyntaxError, 719 AttributeError) as exception: 720 error_no = getattr(exception, "error_no", "00000") 721 LOG.exception("%s: %s" % (get_instance_name(exception), exception), 722 exc_info=False, error_no=error_no) 723 if Scheduler.upload_address: 724 Scheduler.upload_unavailable_result(str(exception.args)) 725 Scheduler.upload_report_end() 726 finally: 727 self.stop_task_log() 728 self.stop_encrypt_log() 729 730 @classmethod 731 def _reset_environment(cls, environment="", config_file=""): 732 env_manager = EnvironmentManager() 733 env_manager.env_stop() 734 EnvironmentManager(environment, config_file) 735 736 @classmethod 737 def _restore_environment(cls): 738 env_manager = EnvironmentManager() 739 env_manager.env_stop() 740 EnvironmentManager() 741 742 @classmethod 743 def start_task_log(cls, log_path): 744 tool_file_name = "task_log.log" 745 tool_log_file = os.path.join(log_path, tool_file_name) 746 add_task_file_handler(tool_log_file) 747 748 @classmethod 749 def start_encrypt_log(cls, log_path): 750 from _core.report.encrypt import check_pub_key_exist 751 if check_pub_key_exist(): 752 encrypt_file_name = "task_log.ept" 753 encrypt_log_file = os.path.join(log_path, encrypt_file_name) 754 add_encrypt_file_handler(encrypt_log_file) 755 756 @classmethod 757 def stop_task_log(cls): 758 remove_task_file_handler() 759 760 @classmethod 761 def stop_encrypt_log(cls): 762 remove_encrypt_file_handler() 763 764 @staticmethod 765 def _find_test_root_descriptor(config): 766 if getattr(config, ConfigConst.task, None) or \ 767 getattr(config, ConfigConst.testargs, None): 768 Scheduler._pre_component_test(config) 769 770 if getattr(config, ConfigConst.subsystems, "") or \ 771 getattr(config, ConfigConst.parts, "") \ 772 or getattr(config, ConfigConst.component_base_kit, ""): 773 uid = unique_id("Scheduler", "component") 774 if config.subsystems or config.parts: 775 test_set = (config.subsystems, config.parts) 776 else: 777 kit = getattr(config, ConfigConst.component_base_kit) 778 test_set = kit.get_white_list() 779 780 root = Descriptor(uuid=uid, name="component", 781 source=TestSetSource(test_set), 782 container=True) 783 root.children = find_test_descriptors(config) 784 return root 785 786 # read test list from testdict 787 if getattr(config, ConfigConst.testdict, "") != "" and getattr( 788 config, ConfigConst.testfile, "") == "": 789 uid = unique_id("Scheduler", "testdict") 790 root = Descriptor(uuid=uid, name="testdict", 791 source=TestSetSource(config.testdict), 792 container=True) 793 root.children = find_testdict_descriptors(config) 794 return root 795 796 # read test list from testfile, testlist or task 797 test_set = getattr(config, ConfigConst.testfile, "") or getattr( 798 config, ConfigConst.testlist, "") or getattr( 799 config, ConfigConst.task, "") or getattr( 800 config, ConfigConst.testcase) 801 if test_set: 802 fname, _ = get_filename_extension(test_set) 803 uid = unique_id("Scheduler", fname) 804 root = Descriptor(uuid=uid, name=fname, 805 source=TestSetSource(test_set), container=True) 806 root.children = find_test_descriptors(config) 807 return root 808 else: 809 raise ParamError("no test file, list, dict, case or task found", 810 error_no="00102") 811 812 @classmethod 813 def terminate_cmd_exec(cls): 814 Scheduler.is_execute = False 815 LOG.info("start to terminate execution") 816 return Scheduler.terminate_result.get() 817 818 @classmethod 819 def upload_case_result(cls, upload_param): 820 if not Scheduler.upload_address: 821 return 822 case_id, result, error, start_time, end_time, report_path = \ 823 upload_param 824 if error and len(error) > 150: 825 error = "%s..." % error[:150] 826 LOG.info( 827 "get upload params: %s, %s, %s, %s, %s, %s" % ( 828 case_id, result, error, start_time, end_time, report_path)) 829 if Scheduler.proxy: 830 Scheduler.proxy.upload_result(case_id, result, error, 831 start_time, end_time, report_path) 832 return 833 from agent.factory import upload_result 834 upload_result(case_id, result, error, start_time, end_time, 835 report_path) 836 837 @classmethod 838 def upload_module_result(cls, exec_message): 839 if not Scheduler.is_execute: 840 return 841 result_file = exec_message.get_result() 842 request = exec_message.get_request() 843 844 test_name = request.root.source.test_name 845 if not result_file or not os.path.exists(result_file): 846 LOG.error("%s result not exists", test_name, error_no="00200") 847 return 848 849 test_type = request.root.source.test_type 850 LOG.info("need upload result: %s, test type: %s" % 851 (result_file, test_type)) 852 upload_params, _, _ = cls._get_upload_params(result_file, request) 853 if not upload_params: 854 LOG.error("%s no test case result to upload" % result_file, 855 error_no="00201") 856 return 857 LOG.info("need upload %s case" % len(upload_params)) 858 upload_suite = [] 859 for upload_param in upload_params: 860 case_id, result, error, start_time, end_time, report_path = \ 861 upload_param 862 case = {"caseid": case_id, "result": result, "error": error, 863 "start": start_time, "end": end_time, 864 "report": report_path} 865 LOG.info("case info: %s", case) 866 upload_suite.append(case) 867 if Scheduler.proxy: 868 Scheduler.proxy.upload_result(case_id, result, error, 869 start_time, end_time, report_path) 870 return 871 from agent.factory import upload_batch 872 upload_batch(upload_suite) 873 874 @classmethod 875 def _get_upload_params(cls, result_file, request): 876 upload_params = [] 877 878 report_path = result_file 879 task_log_path = os.path.join(request.config.report_path, "log", 880 "task_log.log") 881 testsuites_element = DataHelper.parse_data_report(report_path) 882 start_time, end_time = cls._get_time(testsuites_element) 883 884 for testsuite_element in testsuites_element: 885 if check_mode(ModeType.developer): 886 module_name = str(get_filename_extension( 887 report_path)[0]).split(".")[0] 888 else: 889 module_name = testsuite_element.get(ReportConstant.name, 890 "none") 891 for case_element in testsuite_element: 892 case_id = cls._get_case_id(case_element, module_name) 893 case_result, error = cls._get_case_result(case_element) 894 if error and len(error) > 150: 895 error = "%s..." % error[:150] 896 if case_result == "Ignored": 897 LOG.info("get upload params: %s result is ignored", 898 case_id) 899 continue 900 upload_params.append( 901 (case_id, case_result, error, start_time, 902 end_time, task_log_path)) 903 return upload_params, start_time, end_time 904 905 @classmethod 906 def get_script_result(cls, model_element): 907 disabled = int(model_element.get(ReportConstant.disabled)) if \ 908 model_element.get(ReportConstant.disabled, "") else 0 909 failures = int(model_element.get(ReportConstant.failures)) if \ 910 model_element.get(ReportConstant.failures, "") else 0 911 errors = int(model_element.get(ReportConstant.errors)) if \ 912 model_element.get(ReportConstant.errors, "") else 0 913 unavailable = int(model_element.get(ReportConstant.unavailable)) if \ 914 model_element.get(ReportConstant.unavailable, "") else 0 915 if failures > 0 or errors > 0: 916 result = "Failed" 917 elif disabled > 0 or unavailable > 0: 918 result = "Unavailable" 919 else: 920 result = "Passed" 921 922 if result == "Passed": 923 return result, "" 924 if Scheduler.mode == ModeType.decc: 925 result = "Failed" 926 927 error_msg = model_element.get(ReportConstant.message, "") 928 if not error_msg and len(model_element) > 0: 929 error_msg = model_element[0].get(ReportConstant.message, "") 930 if not error_msg and len(model_element[0]) > 0: 931 error_msg = model_element[0][0].get(ReportConstant.message, "") 932 return result, error_msg 933 934 @classmethod 935 def _get_case_id(cls, case_element, package_name): 936 class_name = case_element.get(ReportConstant.class_name, "none") 937 method_name = case_element.get(ReportConstant.name, "none") 938 case_id = "{}#{}#{}#{}".format(Scheduler.task_name, package_name, 939 class_name, method_name) 940 return case_id 941 942 @classmethod 943 def _get_case_result(cls, case_element): 944 # get result 945 case = Case() 946 case.status = case_element.get(ReportConstant.status, "") 947 case.result = case_element.get(ReportConstant.result, "") 948 if case_element.get(ReportConstant.message, ""): 949 case.message = case_element.get(ReportConstant.message) 950 if len(case_element) > 0: 951 if not case.result: 952 case.result = ReportConstant.false 953 case.message = case_element[0].get(ReportConstant.message) 954 if case.is_passed(): 955 result = "Passed" 956 elif case.is_failed(): 957 result = "Failed" 958 elif case.is_blocked(): 959 result = "Blocked" 960 elif case.is_ignored(): 961 result = "Ignored" 962 else: 963 result = "Unavailable" 964 return result, case.message 965 966 @classmethod 967 def _get_time(cls, testsuite_element): 968 start_time = testsuite_element.get(ReportConstant.start_time, "") 969 end_time = testsuite_element.get(ReportConstant.end_time, "") 970 try: 971 if start_time and end_time: 972 start_time = int(time.mktime(time.strptime( 973 start_time, ReportConstant.time_format)) * 1000) 974 end_time = int(time.mktime(time.strptime( 975 end_time, ReportConstant.time_format)) * 1000) 976 else: 977 timestamp = str(testsuite_element.get( 978 ReportConstant.time_stamp, "")).replace("T", " ") 979 cost_time = testsuite_element.get(ReportConstant.time, "") 980 if timestamp and cost_time: 981 try: 982 end_time = int(time.mktime(time.strptime( 983 timestamp, ReportConstant.time_format)) * 1000) 984 except ArithmeticError as error: 985 LOG.error("get time error %s" % error) 986 end_time = int(time.time() * 1000) 987 start_time = int(end_time - float(cost_time) * 1000) 988 else: 989 current_time = int(time.time() * 1000) 990 start_time, end_time = current_time, current_time 991 except ArithmeticError as error: 992 LOG.error("get time error %s" % error) 993 current_time = int(time.time() * 1000) 994 start_time, end_time = current_time, current_time 995 return start_time, end_time 996 997 @classmethod 998 def upload_task_result(cls, task, error_message=""): 999 if not Scheduler.task_name: 1000 LOG.info("no need upload summary report") 1001 return 1002 1003 summary_data_report = os.path.join(task.config.report_path, 1004 ReportConstant.summary_data_report) 1005 summary_vision_report = os.path.join( 1006 task.config.report_path, ReportConstant.summary_vision_report) 1007 if not os.path.exists(summary_data_report): 1008 task_log_path = os.path.join(task.config.report_path, "log", 1009 "task_log.log") 1010 if not os.path.exists(task_log_path): 1011 task_log_path = "" 1012 Scheduler.upload_unavailable_result(str( 1013 error_message) or "summary report not exists", task_log_path) 1014 return 1015 1016 task_element = ElementTree.parse(summary_data_report).getroot() 1017 start_time, end_time = cls._get_time(task_element) 1018 task_result = cls._get_task_result(task_element) 1019 if task_result == "Unavailable": 1020 summary_vision_report = os.path.join(task.config.report_path, 1021 "log", "task_log.log") 1022 error_msg = "" 1023 for child in task_element: 1024 if child.get(ReportConstant.message, ""): 1025 error_msg = "{}{}".format( 1026 error_msg, "%s;" % child.get(ReportConstant.message)) 1027 if error_msg: 1028 error_msg = error_msg[:-1] 1029 cls.upload_case_result((Scheduler.task_name, task_result, 1030 error_msg, start_time, end_time, 1031 summary_vision_report)) 1032 1033 @classmethod 1034 def _get_task_result(cls, task_element): 1035 failures = int(task_element.get(ReportConstant.failures, 0)) 1036 errors = int(task_element.get(ReportConstant.errors, 0)) 1037 disabled = int(task_element.get(ReportConstant.disabled, 0)) 1038 unavailable = int(task_element.get(ReportConstant.unavailable, 0)) 1039 if disabled > 0: 1040 task_result = "Blocked" 1041 elif errors > 0 or failures > 0: 1042 task_result = "Failed" 1043 elif unavailable > 0: 1044 task_result = "Unavailable" 1045 else: 1046 task_result = "Passed" 1047 return task_result 1048 1049 @classmethod 1050 def upload_unavailable_result(cls, error_msg, report_path=""): 1051 start_time = int(time.time() * 1000) 1052 Scheduler.upload_case_result((Scheduler.task_name, "Unavailable", 1053 error_msg, start_time, start_time, 1054 report_path)) 1055 1056 @classmethod 1057 def upload_report_end(cls): 1058 LOG.info("Upload report end") 1059 if Scheduler.proxy is not None: 1060 Scheduler.proxy.report_end() 1061 return 1062 from agent.factory import report_end 1063 report_end() 1064 1065 @classmethod 1066 def is_module_need_retry(cls, task, module_name): 1067 failed_flag = False 1068 if check_mode(ModeType.decc): 1069 from xdevice import SuiteReporter 1070 for module, failed in SuiteReporter.get_failed_case_list(): 1071 if module_name == module or str(module_name).split( 1072 ".")[0] == module: 1073 failed_flag = True 1074 break 1075 else: 1076 from xdevice import ResultReporter 1077 history_report_path = \ 1078 getattr(task.config, ConfigConst.history_report_path, "") 1079 params = ResultReporter.get_task_info_params(history_report_path) 1080 if params and params[3]: 1081 if dict(params[3]).get(module_name, []): 1082 failed_flag = True 1083 elif dict(params[3]).get(str(module_name).split(".")[0], []): 1084 failed_flag = True 1085 return failed_flag 1086 1087 @classmethod 1088 def compare_spt_time(cls, kit_spt, device_spt): 1089 if not kit_spt or not device_spt: 1090 return False 1091 try: 1092 kit_time = str(kit_spt).split("-")[:2] 1093 device_time = str(device_spt).split("-")[:2] 1094 k_spt = datetime.datetime.strptime( 1095 "-".join(kit_time), "%Y-%m") 1096 d_spt = datetime.datetime.strptime("-".join(device_time), "%Y-%m") 1097 except ValueError as value_error: 1098 LOG.debug("date format is error, %s" % value_error.args) 1099 return False 1100 month_interval = int(k_spt.month) - int(d_spt.month) 1101 year_interval = int(k_spt.year) - int(d_spt.year) 1102 LOG.debug("kit spt (year=%s, month=%s), device spt (year=%s, month=%s)" 1103 % (k_spt.year, k_spt.month, d_spt.year, d_spt.month)) 1104 if year_interval == 0 and month_interval in (0, 1, 2): 1105 return True 1106 if year_interval == 1 and month_interval + 12 in (1, 2): 1107 return True 1108 1109 @classmethod 1110 def _parse_version_id(cls, driver_request, kit): 1111 test_args = copy.deepcopy( 1112 driver_request.config.get(ConfigConst.testargs, dict())) 1113 version_id = "" 1114 if ConfigConst.pass_through in test_args.keys(): 1115 import json 1116 pt_dict = json.loads(test_args.get(ConfigConst.pass_through, "")) 1117 version_id = pt_dict.get("VersionID", None) 1118 elif "VersionID" in test_args.keys(): 1119 version_id = test_args.get("VersionID", None) 1120 if version_id: 1121 kit_version = version_id 1122 else: 1123 kit_version = kit.properties.get(ConfigConst.version, None) 1124 return kit_version 1125 1126 @classmethod 1127 def update_test_type_in_source(cls, key, value): 1128 LOG.debug("update test type dict in source") 1129 TestDictSource.test_type[key] = value 1130 1131 @classmethod 1132 def update_ext_type_in_source(cls, key, value): 1133 LOG.debug("update ext type dict in source") 1134 TestDictSource.exe_type[key] = value 1135 1136 @classmethod 1137 def _clear_test_dict_source(cls): 1138 TestDictSource.exe_type.clear() 1139 TestDictSource.test_type.clear() 1140 1141 @classmethod 1142 def _pre_component_test(cls, config): 1143 if not config.kits: 1144 return 1145 cur_kit = None 1146 for kit in config.kits: 1147 if kit.__class__.__name__ == CKit.component: 1148 cur_kit = kit 1149 break 1150 if not cur_kit: 1151 return 1152 get_white_list = getattr(cur_kit, "get_white_list", None) 1153 if not callable(get_white_list): 1154 return 1155 subsystems, parts = get_white_list() 1156 if not subsystems and not parts: 1157 return 1158 setattr(config, ConfigConst.component_base_kit, cur_kit) 1159 1160 @classmethod 1161 def component_task_setup(cls, task, module_name): 1162 component_kit = task.config.get(ConfigConst.component_base_kit, None) 1163 if not component_kit: 1164 # only -p -s .you do not care about the components that can be 1165 # supported. you only want to run the use cases of the current 1166 # component 1167 return 1168 LOG.debug("Start component task setup") 1169 _component_mapper = task.config.get(ConfigConst.component_mapper) 1170 _subsystem, _part = _component_mapper.get(module_name) 1171 1172 is_hit = False 1173 # find in cache. if not find, update cache 1174 cache_subsystem, cache_part = component_kit.get_cache() 1175 if _subsystem in cache_subsystem or _part in cache_subsystem: 1176 is_hit = True 1177 if not is_hit: 1178 env_manager = EnvironmentManager() 1179 for _, manager in env_manager.managers.items(): 1180 if getattr(manager, "devices_list", []): 1181 for device in manager.devices_list: 1182 component_kit.__setup__(device) 1183 cache_subsystem, cache_part = component_kit.get_cache() 1184 if _subsystem in cache_subsystem or _part in cache_subsystem: 1185 is_hit = True 1186 if not is_hit: 1187 LOG.warning("%s are skipped, no suitable component found. " 1188 "Require subsystem=%s part=%s, no device match this" 1189 % (module_name, _subsystem, _part)) 1190