1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import copy 20import datetime 21import os 22import queue 23import time 24import uuid 25import shutil 26from xml.etree import ElementTree 27 28from _core.utils import unique_id 29from _core.utils import check_mode 30from _core.utils import get_sub_path 31from _core.utils import get_filename_extension 32from _core.utils import convert_serial 33from _core.utils import get_instance_name 34from _core.utils import is_config_str 35from _core.utils import check_result_report 36from _core.utils import get_cst_time 37from _core.environment.manager_env import EnvironmentManager 38from _core.environment.manager_env import DeviceSelectionOption 39from _core.exception import ParamError 40from _core.exception import ExecuteTerminate 41from _core.exception import LiteDeviceError 42from _core.exception import DeviceError 43from _core.interface import LifeCycle 44from _core.executor.request import Request 45from _core.executor.request import Task 46from _core.executor.request import Descriptor 47from _core.plugin import get_plugin 48from _core.plugin import Plugin 49from _core.plugin import Config 50from _core.report.reporter_helper import ExecInfo 51from _core.report.reporter_helper import ReportConstant 52from _core.report.reporter_helper import Case 53from _core.report.reporter_helper import DataHelper 54from _core.constants import TestExecType 55from _core.constants import CKit 56from _core.constants import ModeType 57from _core.constants import DeviceLabelType 58from _core.constants import SchedulerType 59from _core.constants import ListenerType 60from _core.constants import ConfigConst 61from _core.constants import ReportConst 62from _core.constants import HostDrivenTestType 63from _core.executor.concurrent import DriversThread 64from _core.executor.concurrent import QueueMonitorThread 65from _core.executor.concurrent import DriversDryRunThread 66from _core.executor.source import TestSetSource 67from _core.executor.source import find_test_descriptors 68from _core.executor.source import find_testdict_descriptors 69from _core.executor.source import TestDictSource 70from _core.logger import platform_logger 71from _core.logger import add_task_file_handler 72from _core.logger import remove_task_file_handler 73from _core.logger import add_encrypt_file_handler 74from _core.logger import remove_encrypt_file_handler 75 76__all__ = ["Scheduler"] 77LOG = platform_logger("Scheduler") 78 79MAX_VISIBLE_LENGTH = 150 80 81 82@Plugin(type=Plugin.SCHEDULER, id=SchedulerType.scheduler) 83class Scheduler(object): 84 """ 85 The Scheduler is the main entry point for client code that wishes to 86 discover and execute tests. 87 """ 88 # factory params 89 is_execute = True 90 terminate_result = queue.Queue() 91 upload_address = "" 92 task_type = "" 93 task_name = "" 94 mode = "" 95 proxy = None 96 97 # command_queue to store test commands 98 command_queue = [] 99 max_command_num = 50 100 # the number of tests in current task 101 test_number = 0 102 device_labels = [] 103 104 def __discover__(self, args): 105 """Discover task to execute""" 106 config = Config() 107 config.update(args) 108 task = Task(drivers=[]) 109 task.init(config) 110 111 TestDictSource.reset() 112 root_descriptor = self._find_test_root_descriptor(task.config) 113 task.set_root_descriptor(root_descriptor) 114 return task 115 116 def __execute__(self, task): 117 error_message = "" 118 try: 119 Scheduler.is_execute = True 120 if Scheduler.command_queue: 121 LOG.debug("Run command: %s" % Scheduler.command_queue[-1]) 122 run_command = Scheduler.command_queue.pop() 123 task_id = str(uuid.uuid1()).split("-")[0] 124 Scheduler.command_queue.append((task_id, run_command, 125 task.config.report_path)) 126 if len(Scheduler.command_queue) > self.max_command_num: 127 Scheduler.command_queue.pop(0) 128 129 if getattr(task.config, ConfigConst.test_environment, ""): 130 self._reset_environment(task.config.get( 131 ConfigConst.test_environment, "")) 132 elif getattr(task.config, ConfigConst.configfile, ""): 133 self._reset_environment(config_file=task.config.get( 134 ConfigConst.configfile, "")) 135 136 # do with the count of repeat about a task 137 if getattr(task.config, ConfigConst.repeat, 0) > 0: 138 drivers_list = list() 139 for repeat_index in range(task.config.repeat): 140 for driver_index in range(len(task.test_drivers)): 141 drivers_list.append( 142 copy.deepcopy(task.test_drivers[driver_index])) 143 task.test_drivers = drivers_list 144 145 self.test_number = len(task.test_drivers) 146 147 if task.config.exectype == TestExecType.device_test: 148 self._device_test_execute(task) 149 elif task.config.exectype == TestExecType.host_test: 150 self._host_test_execute(task) 151 else: 152 LOG.info("Exec type %s is bypassed" % task.config.exectype) 153 154 except (ParamError, ValueError, TypeError, SyntaxError, AttributeError, 155 DeviceError, LiteDeviceError, ExecuteTerminate) as exception: 156 error_no = getattr(exception, "error_no", "") 157 error_message = "%s[%s]" % (str(exception), error_no) \ 158 if error_no else str(exception) 159 error_no = error_no if error_no else "00000" 160 LOG.exception(exception, exc_info=False, error_no=error_no) 161 162 finally: 163 Scheduler._clear_test_dict_source() 164 if getattr(task.config, ConfigConst.test_environment, "") or \ 165 getattr(task.config, ConfigConst.configfile, ""): 166 self._restore_environment() 167 168 if Scheduler.upload_address: 169 Scheduler.upload_task_result(task, error_message) 170 Scheduler.upload_report_end() 171 172 def _device_test_execute(self, task): 173 used_devices = {} 174 try: 175 self._dynamic_concurrent_execute(task, used_devices) 176 finally: 177 Scheduler.__reset_environment__(used_devices) 178 # generate reports 179 self._generate_task_report(task, used_devices) 180 181 def _host_test_execute(self, task): 182 """Execute host test""" 183 try: 184 # initial params 185 current_driver_threads = {} 186 test_drivers = task.test_drivers 187 message_queue = queue.Queue() 188 189 # execute test drivers 190 queue_monitor_thread = self._start_queue_monitor( 191 message_queue, test_drivers, current_driver_threads) 192 while test_drivers: 193 if len(current_driver_threads) > 5: 194 time.sleep(3) 195 continue 196 197 # clear remaining test drivers when scheduler is terminated 198 if not Scheduler.is_execute: 199 LOG.info("Clear test drivers") 200 self._clear_not_executed(task, test_drivers) 201 break 202 203 # get test driver and device 204 test_driver = test_drivers[0] 205 206 # display executing progress 207 self._display_executing_process(None, test_driver, 208 test_drivers) 209 210 # start driver thread 211 self._start_driver_thread(current_driver_threads, ( 212 None, message_queue, task, test_driver)) 213 test_drivers.pop(0) 214 215 # wait for all drivers threads finished and do kit teardown 216 while True: 217 if not queue_monitor_thread.is_alive(): 218 break 219 time.sleep(3) 220 221 finally: 222 # generate reports 223 self._generate_task_report(task) 224 225 def _dry_run_device_test_execute(self, task): 226 try: 227 # initial params 228 used_devices = {} 229 current_driver_threads = {} 230 test_drivers = task.test_drivers 231 message_queue = queue.Queue() 232 task_unused_env = [] 233 234 # execute test drivers 235 queue_monitor_thread = self._start_queue_monitor( 236 message_queue, test_drivers, current_driver_threads) 237 while test_drivers: 238 # clear remaining test drivers when scheduler is terminated 239 if not Scheduler.is_execute: 240 LOG.info("Clear test drivers") 241 self._clear_not_executed(task, test_drivers) 242 break 243 244 # get test driver and device 245 test_driver = test_drivers[0] 246 # get environment 247 try: 248 environment = self.__allocate_environment__( 249 task.config.__dict__, test_driver) 250 except DeviceError as exception: 251 self._handle_device_error(exception, task, test_drivers) 252 continue 253 254 if not Scheduler.is_execute: 255 if environment: 256 Scheduler.__free_environment__(environment) 257 continue 258 259 # start driver thread 260 thread_id = self._get_thread_id(current_driver_threads) 261 driver_thread = DriversDryRunThread(test_driver, task, environment, 262 message_queue) 263 driver_thread.setDaemon(True) 264 driver_thread.set_thread_id(thread_id) 265 driver_thread.start() 266 current_driver_threads.setdefault(thread_id, driver_thread) 267 268 test_drivers.pop(0) 269 270 # wait for all drivers threads finished and do kit teardown 271 while True: 272 if not queue_monitor_thread.is_alive(): 273 break 274 time.sleep(3) 275 276 self._do_taskkit_teardown(used_devices, task_unused_env) 277 finally: 278 LOG.debug("Removing report_path: {}".format(task.config.report_path)) 279 # delete reports 280 self.stop_task_logcat() 281 self.stop_encrypt_log() 282 shutil.rmtree(task.config.report_path) 283 284 def _generate_task_report(self, task, used_devices=None): 285 task_info = ExecInfo() 286 test_type = getattr(task.config, "testtype", []) 287 task_name = getattr(task.config, "task", "") 288 if task_name: 289 task_info.test_type = str(task_name).upper() 290 else: 291 task_info.test_type = ",".join(test_type) if test_type else "Test" 292 if used_devices: 293 serials = [] 294 platforms = [] 295 for serial, device in used_devices.items(): 296 serials.append(convert_serial(serial)) 297 platform = str(device.label).capitalize() 298 if platform not in platforms: 299 platforms.append(platform) 300 task_info.device_name = ",".join(serials) 301 task_info.platform = ",".join(platforms) 302 else: 303 task_info.device_name = "None" 304 task_info.platform = "None" 305 task_info.test_time = task.config.start_time 306 task_info.product_info = getattr(task, "product_info", "") 307 308 listeners = self._create_listeners(task) 309 for listener in listeners: 310 listener.__ended__(LifeCycle.TestTask, task_info, 311 test_type=task_info.test_type) 312 313 @classmethod 314 def _create_listeners(cls, task): 315 listeners = [] 316 # append log listeners 317 log_listeners = get_plugin(Plugin.LISTENER, ListenerType.log) 318 for log_listener in log_listeners: 319 log_listener_instance = log_listener.__class__() 320 listeners.append(log_listener_instance) 321 # append report listeners 322 report_listeners = get_plugin(Plugin.LISTENER, ListenerType.report) 323 for report_listener in report_listeners: 324 report_listener_instance = report_listener.__class__() 325 setattr(report_listener_instance, "report_path", 326 task.config.report_path) 327 listeners.append(report_listener_instance) 328 # append upload listeners 329 upload_listeners = get_plugin(Plugin.LISTENER, ListenerType.upload) 330 for upload_listener in upload_listeners: 331 upload_listener_instance = upload_listener.__class__() 332 listeners.append(upload_listener_instance) 333 return listeners 334 335 @staticmethod 336 def _find_device_options(environment_config, options, test_source): 337 devices_option = [] 338 index = 1 339 for device_dict in environment_config: 340 label = device_dict.get("label", "") 341 required_manager = device_dict.get("type", "device") 342 required_manager = \ 343 required_manager if required_manager else "device" 344 if not label: 345 continue 346 device_option = DeviceSelectionOption(options, label, test_source) 347 device_dict.pop("type", None) 348 device_dict.pop("label", None) 349 device_option.required_manager = required_manager 350 device_option.extend_value = device_dict 351 device_option.source_file = \ 352 test_source.config_file or test_source.source_string 353 if hasattr(device_option, "env_index"): 354 device_option.env_index = index 355 index += 1 356 devices_option.append(device_option) 357 return devices_option 358 359 def __allocate_environment__(self, options, test_driver): 360 device_options = self.get_device_options(options, 361 test_driver[1].source) 362 environment = None 363 env_manager = EnvironmentManager() 364 while True: 365 if not Scheduler.is_execute: 366 break 367 environment = env_manager.apply_environment(device_options) 368 if len(environment.devices) == len(device_options): 369 return environment 370 else: 371 env_manager.release_environment(environment) 372 LOG.debug("'%s' is waiting available device", 373 test_driver[1].source.test_name) 374 if env_manager.check_device_exist(device_options): 375 continue 376 else: 377 LOG.debug("'%s' required %s devices, actually %s devices" 378 " were found" % (test_driver[1].source.test_name, 379 len(device_options), 380 len(environment.devices))) 381 raise DeviceError("The '%s' required device does not exist" 382 % test_driver[1].source.source_file, 383 error_no="00104") 384 385 return environment 386 387 @classmethod 388 def get_device_options(cls, options, test_source): 389 device_options = [] 390 config_file = test_source.config_file 391 environment_config = [] 392 from _core.testkit.json_parser import JsonParser 393 if test_source.source_string and is_config_str( 394 test_source.source_string): 395 json_config = JsonParser(test_source.source_string) 396 environment_config = json_config.get_environment() 397 device_options = cls._find_device_options( 398 environment_config, options, test_source) 399 elif config_file and os.path.exists(config_file): 400 json_config = JsonParser(test_source.config_file) 401 environment_config = json_config.get_environment() 402 device_options = cls._find_device_options( 403 environment_config, options, test_source) 404 405 device_options = cls._calculate_device_options( 406 device_options, environment_config, options, test_source) 407 408 if ConfigConst.component_mapper in options.keys(): 409 required_component = options.get(ConfigConst.component_mapper). \ 410 get(test_source.module_name, None) 411 for device_option in device_options: 412 device_option.required_component = required_component 413 return device_options 414 415 @staticmethod 416 def __free_environment__(environment): 417 env_manager = EnvironmentManager() 418 env_manager.release_environment(environment) 419 420 @staticmethod 421 def __reset_environment__(used_devices): 422 env_manager = EnvironmentManager() 423 env_manager.reset_environment(used_devices) 424 425 @classmethod 426 def _check_device_spt(cls, kit, driver_request, device): 427 kit_spt = cls._parse_property_value(ConfigConst.spt, 428 driver_request, kit) 429 if not kit_spt: 430 setattr(device, ConfigConst.task_state, False) 431 LOG.error("Spt is empty", error_no="00108") 432 return 433 if getattr(driver_request, ConfigConst.product_info, ""): 434 product_info = getattr(driver_request, 435 ConfigConst.product_info) 436 if not isinstance(product_info, dict): 437 LOG.warning("Product info should be dict, %s", 438 product_info) 439 setattr(device, ConfigConst.task_state, False) 440 return 441 device_spt = product_info.get("Security Patch", None) 442 if not device_spt or not \ 443 Scheduler.compare_spt_time(kit_spt, device_spt): 444 LOG.error("The device %s spt is %s, " 445 "and the test case spt is %s, " 446 "which does not meet the requirements" % 447 (device.device_sn, device_spt, kit_spt), 448 error_no="00116") 449 setattr(device, ConfigConst.task_state, False) 450 return 451 452 def _decc_task_setup(self, environment, task): 453 config = Config() 454 config.update(task.config.__dict__) 455 config.environment = environment 456 driver_request = Request(config=config) 457 458 if environment is None: 459 return False 460 461 for device in environment.devices: 462 if not getattr(device, ConfigConst.need_kit_setup, True): 463 LOG.debug("Device %s need kit setup is false" % device) 464 continue 465 466 # do task setup for device 467 kits_copy = copy.deepcopy(task.config.kits) 468 setattr(device, ConfigConst.task_kits, kits_copy) 469 for kit in getattr(device, ConfigConst.task_kits, []): 470 if not Scheduler.is_execute: 471 break 472 try: 473 kit.__setup__(device, request=driver_request) 474 except (ParamError, ExecuteTerminate, DeviceError, 475 LiteDeviceError, ValueError, TypeError, 476 SyntaxError, AttributeError) as exception: 477 error_no = getattr(exception, "error_no", "00000") 478 LOG.exception( 479 "Task setup device: %s, exception: %s" % ( 480 environment.__get_serial__(), 481 exception), exc_info=False, error_no=error_no) 482 if kit.__class__.__name__ == CKit.query and \ 483 device.label in [DeviceLabelType.ipcamera]: 484 self._check_device_spt(kit, driver_request, device) 485 LOG.debug("Set device %s need kit setup to false" % device) 486 setattr(device, ConfigConst.need_kit_setup, False) 487 488 for device in environment.devices: 489 if not getattr(device, ConfigConst.task_state, True): 490 return False 491 492 # set product_info to self.task 493 if getattr(driver_request, ConfigConst.product_info, "") and \ 494 not getattr(task, ConfigConst.product_info, ""): 495 product_info = getattr(driver_request, ConfigConst.product_info) 496 if not isinstance(product_info, dict): 497 LOG.warning("Product info should be dict, %s", 498 product_info) 499 else: 500 setattr(task, ConfigConst.product_info, product_info) 501 return True 502 503 def _dynamic_concurrent_execute(self, task, used_devices): 504 # initial params 505 current_driver_threads = {} 506 test_drivers = task.test_drivers 507 message_queue = queue.Queue() 508 task_unused_env = [] 509 510 # execute test drivers 511 queue_monitor_thread = self._start_queue_monitor( 512 message_queue, test_drivers, current_driver_threads) 513 while test_drivers: 514 # clear remaining test drivers when scheduler is terminated 515 if not Scheduler.is_execute: 516 LOG.info("Clear test drivers") 517 self._clear_not_executed(task, test_drivers) 518 break 519 520 # get test driver and device 521 test_driver = test_drivers[0] 522 523 if getattr(task.config, ConfigConst.history_report_path, ""): 524 module_name = test_driver[1].source.module_name 525 if not self.is_module_need_retry(task, module_name): 526 self._display_executing_process(None, test_driver, 527 test_drivers) 528 LOG.info("%s are passed, no need to retry" % module_name) 529 self._append_history_result(task, module_name) 530 LOG.info("") 531 test_drivers.pop(0) 532 continue 533 534 if getattr(task.config, ConfigConst.component_mapper, ""): 535 module_name = test_driver[1].source.module_name 536 self.component_task_setup(task, module_name) 537 538 # get environment 539 try: 540 environment = self.__allocate_environment__( 541 task.config.__dict__, test_driver) 542 except DeviceError as exception: 543 self._handle_device_error(exception, task, test_drivers) 544 continue 545 546 if not Scheduler.is_execute: 547 if environment: 548 Scheduler.__free_environment__(environment) 549 continue 550 551 if check_mode(ModeType.decc) or getattr( 552 task.config, ConfigConst.check_device, False): 553 LOG.info("Start to check environment: %s" % 554 environment.__get_serial__()) 555 status = self._decc_task_setup(environment, task) 556 if not status: 557 Scheduler.__free_environment__(environment) 558 task_unused_env.append(environment) 559 error_message = "Load Error[00116]" 560 self.report_not_executed(task.config.report_path, 561 [test_drivers[0]], 562 error_message, task) 563 test_drivers.pop(0) 564 continue 565 else: 566 LOG.info("Environment %s check success", 567 environment.__get_serial__()) 568 569 # display executing progress 570 self._display_executing_process(environment, test_driver, 571 test_drivers) 572 573 # add to used devices and set need_kit_setup attribute 574 self._append_used_devices(environment, used_devices) 575 576 # start driver thread 577 self._start_driver_thread(current_driver_threads, ( 578 environment, message_queue, task, test_driver)) 579 test_drivers.pop(0) 580 581 # wait for all drivers threads finished and do kit teardown 582 while True: 583 if not queue_monitor_thread.is_alive(): 584 break 585 time.sleep(3) 586 587 self._do_taskkit_teardown(used_devices, task_unused_env) 588 589 @classmethod 590 def _append_history_result(cls, task, module_name): 591 history_report_path = getattr( 592 task.config, ConfigConst.history_report_path, "") 593 from _core.report.result_reporter import ResultReporter 594 params = ResultReporter.get_task_info_params( 595 history_report_path) 596 597 if not params or not params[ReportConst.data_reports]: 598 LOG.debug("Task info record data reports is empty") 599 return 600 601 report_data_dict = dict(params[ReportConst.data_reports]) 602 if module_name not in report_data_dict.keys(): 603 module_name_ = str(module_name).split(".")[0] 604 if module_name_ not in report_data_dict.keys(): 605 LOG.error("%s not in data reports" % module_name) 606 return 607 module_name = module_name_ 608 609 from xdevice import SuiteReporter 610 if check_mode(ModeType.decc): 611 virtual_report_path, report_result = SuiteReporter. \ 612 get_history_result_by_module(module_name) 613 LOG.debug("Append history result: (%s, %s)" % ( 614 virtual_report_path, report_result)) 615 SuiteReporter.append_report_result( 616 (virtual_report_path, report_result)) 617 else: 618 history_execute_result = report_data_dict.get(module_name, "") 619 LOG.info("Start copy %s" % history_execute_result) 620 file_name = get_filename_extension(history_execute_result)[0] 621 if os.path.exists(history_execute_result): 622 result_dir = \ 623 os.path.join(task.config.report_path, "result") 624 os.makedirs(result_dir, exist_ok=True) 625 target_execute_result = "%s.xml" % os.path.join( 626 task.config.report_path, "result", file_name) 627 shutil.copyfile(history_execute_result, target_execute_result) 628 LOG.info("Copy %s to %s" % ( 629 history_execute_result, target_execute_result)) 630 else: 631 error_msg = "Copy failed! %s not exists!" % \ 632 history_execute_result 633 raise ParamError(error_msg) 634 635 def _handle_device_error(self, exception, task, test_drivers): 636 self._display_executing_process(None, test_drivers[0], test_drivers) 637 error_message = "%s: %s" % \ 638 (get_instance_name(exception), exception) 639 LOG.exception(error_message, exc_info=False, 640 error_no=exception.error_no) 641 if check_mode(ModeType.decc): 642 error_message = "Load Error[00104]" 643 self.report_not_executed(task.config.report_path, [test_drivers[0]], 644 error_message, task) 645 646 LOG.info("") 647 test_drivers.pop(0) 648 649 @classmethod 650 def _clear_not_executed(cls, task, test_drivers): 651 if Scheduler.mode != ModeType.decc: 652 # clear all 653 test_drivers.clear() 654 return 655 # The result is reported only in DECC mode, and also clear all. 656 LOG.error("Case no run: task execution terminated!", error_no="00300") 657 error_message = "Execute Terminate[00300]" 658 cls.report_not_executed(task.config.report_path, test_drivers, 659 error_message) 660 test_drivers.clear() 661 662 @classmethod 663 def report_not_executed(cls, report_path, test_drivers, error_message, 664 task=None): 665 # traversing list to get remained elements 666 for test_driver in test_drivers: 667 # get report file 668 if task and getattr(task.config, "testdict", ""): 669 report_file = os.path.join(get_sub_path( 670 test_driver[1].source.source_file), 671 "%s.xml" % test_driver[1].source.test_name) 672 else: 673 report_file = os.path.join( 674 report_path, "result", 675 "%s.xml" % test_driver[1].source.module_name) 676 677 # get report name 678 report_name = test_driver[1].source.test_name if \ 679 not test_driver[1].source.test_name.startswith("{") \ 680 else "report" 681 682 # get module name 683 module_name = test_driver[1].source.module_name 684 685 # here, normally create empty report and then upload result 686 check_result_report(report_path, report_file, error_message, 687 report_name, module_name) 688 689 def _start_driver_thread(self, current_driver_threads, thread_params): 690 environment, message_queue, task, test_driver = thread_params 691 thread_id = self._get_thread_id(current_driver_threads) 692 driver_thread = DriversThread(test_driver, task, environment, 693 message_queue) 694 driver_thread.setDaemon(True) 695 driver_thread.set_thread_id(thread_id) 696 driver_thread.set_listeners(self._create_listeners(task)) 697 driver_thread.start() 698 current_driver_threads.setdefault(thread_id, driver_thread) 699 700 @classmethod 701 def _do_taskkit_teardown(cls, used_devices, task_unused_env): 702 for device in used_devices.values(): 703 if getattr(device, ConfigConst.need_kit_setup, True): 704 continue 705 706 for kit in getattr(device, ConfigConst.task_kits, []): 707 try: 708 kit.__teardown__(device) 709 except Exception as error: 710 LOG.debug("Do task kit teardown: %s" % error) 711 setattr(device, ConfigConst.task_kits, []) 712 setattr(device, ConfigConst.need_kit_setup, True) 713 714 for environment in task_unused_env: 715 for device in environment.devices: 716 setattr(device, ConfigConst.task_state, True) 717 setattr(device, ConfigConst.need_kit_setup, True) 718 719 def _display_executing_process(self, environment, test_driver, 720 test_drivers): 721 source_content = test_driver[1].source.source_file or \ 722 test_driver[1].source.source_string 723 if environment is None: 724 LOG.info("[%d / %d] Executing: %s, Driver: %s" % 725 (self.test_number - len(test_drivers) + 1, 726 self.test_number, source_content, 727 test_driver[1].source.test_type)) 728 return 729 730 LOG.info("[%d / %d] Executing: %s, Device: %s, Driver: %s" % 731 (self.test_number - len(test_drivers) + 1, 732 self.test_number, source_content, 733 environment.__get_serial__(), 734 test_driver[1].source.test_type)) 735 736 @classmethod 737 def _get_thread_id(cls, current_driver_threads): 738 thread_id = get_cst_time().strftime( 739 '%Y-%m-%d-%H-%M-%S-%f') 740 while thread_id in current_driver_threads.keys(): 741 thread_id = get_cst_time().strftime( 742 '%Y-%m-%d-%H-%M-%S-%f') 743 return thread_id 744 745 @classmethod 746 def _append_used_devices(cls, environment, used_devices): 747 if environment is not None: 748 for device in environment.devices: 749 device_serial = device.__get_serial__() if device else "None" 750 if device_serial and device_serial not in used_devices.keys(): 751 used_devices[device_serial] = device 752 753 @staticmethod 754 def _start_queue_monitor(message_queue, test_drivers, 755 current_driver_threads): 756 queue_monitor_thread = QueueMonitorThread(message_queue, 757 current_driver_threads, 758 test_drivers) 759 queue_monitor_thread.setDaemon(True) 760 queue_monitor_thread.start() 761 return queue_monitor_thread 762 763 def exec_command(self, command, options): 764 """ 765 Directly executes a command without adding it to the command queue. 766 """ 767 if command != "run": 768 raise ParamError("unsupported command action: %s" % command, 769 error_no="00100") 770 exec_type = options.exectype 771 if exec_type in [TestExecType.device_test, TestExecType.host_test, 772 TestExecType.host_driven_test]: 773 self._exec_task(options) 774 else: 775 LOG.error("Unsupported execution type '%s'" % exec_type, 776 error_no="00100") 777 778 return 779 780 def _exec_task(self, options): 781 """ 782 Directly allocates a device and execute a device test. 783 """ 784 try: 785 task = self.__discover__(options.__dict__) 786 self.__execute__(task) 787 except (ParamError, ValueError, TypeError, SyntaxError, 788 AttributeError) as exception: 789 error_no = getattr(exception, "error_no", "00000") 790 LOG.exception("%s: %s" % (get_instance_name(exception), exception), 791 exc_info=False, error_no=error_no) 792 if Scheduler.upload_address: 793 Scheduler.upload_unavailable_result(str(exception.args)) 794 Scheduler.upload_report_end() 795 finally: 796 self.stop_task_logcat() 797 self.stop_encrypt_log() 798 799 @classmethod 800 def _reset_environment(cls, environment="", config_file=""): 801 env_manager = EnvironmentManager() 802 env_manager.env_stop() 803 EnvironmentManager(environment, config_file) 804 805 @classmethod 806 def _restore_environment(cls): 807 env_manager = EnvironmentManager() 808 env_manager.env_stop() 809 EnvironmentManager() 810 811 @classmethod 812 def start_task_log(cls, log_path): 813 tool_file_name = "task_log.log" 814 tool_log_file = os.path.join(log_path, tool_file_name) 815 add_task_file_handler(tool_log_file) 816 817 @classmethod 818 def start_encrypt_log(cls, log_path): 819 from _core.report.encrypt import check_pub_key_exist 820 if check_pub_key_exist(): 821 encrypt_file_name = "task_log.ept" 822 encrypt_log_file = os.path.join(log_path, encrypt_file_name) 823 add_encrypt_file_handler(encrypt_log_file) 824 825 @classmethod 826 def stop_task_logcat(cls): 827 remove_task_file_handler() 828 829 @classmethod 830 def stop_encrypt_log(cls): 831 remove_encrypt_file_handler() 832 833 @staticmethod 834 def _find_test_root_descriptor(config): 835 if getattr(config, ConfigConst.task, None) or \ 836 getattr(config, ConfigConst.testargs, None): 837 Scheduler._pre_component_test(config) 838 839 if getattr(config, ConfigConst.subsystems, "") or \ 840 getattr(config, ConfigConst.parts, "") or \ 841 getattr(config, ConfigConst.component_base_kit, ""): 842 uid = unique_id("Scheduler", "component") 843 if config.subsystems or config.parts: 844 test_set = (config.subsystems, config.parts) 845 else: 846 kit = getattr(config, ConfigConst.component_base_kit) 847 test_set = kit.get_white_list() 848 849 root = Descriptor(uuid=uid, name="component", 850 source=TestSetSource(test_set), 851 container=True) 852 853 root.children = find_test_descriptors(config) 854 return root 855 # read test list from testdict 856 if getattr(config, ConfigConst.testdict, "") != "" and getattr( 857 config, ConfigConst.testfile, "") == "": 858 uid = unique_id("Scheduler", "testdict") 859 root = Descriptor(uuid=uid, name="testdict", 860 source=TestSetSource(config.testdict), 861 container=True) 862 root.children = find_testdict_descriptors(config) 863 return root 864 865 # read test list from testfile, testlist or task 866 test_set = getattr(config, ConfigConst.testfile, "") or getattr( 867 config, ConfigConst.testlist, "") or getattr( 868 config, ConfigConst.task, "") or getattr( 869 config, ConfigConst.testcase) 870 # read test list from testfile, testlist or task 871 test_set = getattr(config, "testfile", "") or getattr( 872 config, "testlist", "") or getattr(config, "task", "") or getattr( 873 config, "testcase") 874 if test_set: 875 fname, _ = get_filename_extension(test_set) 876 uid = unique_id("Scheduler", fname) 877 root = Descriptor(uuid=uid, name=fname, 878 source=TestSetSource(test_set), container=True) 879 root.children = find_test_descriptors(config) 880 return root 881 else: 882 raise ParamError("no test file, list, dict, case or task found", 883 error_no="00102") 884 885 @classmethod 886 def terminate_cmd_exec(cls): 887 Scheduler.is_execute = False 888 LOG.info("Start to terminate execution") 889 return Scheduler.terminate_result.get() 890 891 @classmethod 892 def upload_case_result(cls, upload_param): 893 if not Scheduler.upload_address: 894 return 895 case_id, result, error, start_time, end_time, report_path = \ 896 upload_param 897 if error and len(error) > MAX_VISIBLE_LENGTH: 898 error = "%s..." % error[:MAX_VISIBLE_LENGTH] 899 LOG.info( 900 "Get upload params: %s, %s, %s, %s, %s, %s" % ( 901 case_id, result, error, start_time, end_time, report_path)) 902 if Scheduler.proxy is not None: 903 Scheduler.proxy.upload_result(case_id, result, error, start_time, 904 end_time, report_path) 905 else: 906 LOG.debug("There is no proxy, can't upload case result") 907 908 @classmethod 909 def upload_module_result(cls, exec_message): 910 if not Scheduler.is_execute: 911 return 912 result_file = exec_message.get_result() 913 request = exec_message.get_request() 914 915 test_name = request.root.source.test_name 916 if not result_file or not os.path.exists(result_file): 917 LOG.error("%s result not exists", test_name, error_no="00200") 918 return 919 920 test_type = request.root.source.test_type 921 LOG.info("Need upload result: %s, test type: %s" % 922 (result_file, test_type)) 923 upload_params, _, _ = cls._get_upload_params(result_file, request) 924 if not upload_params: 925 LOG.error("%s no test case result to upload" % result_file, 926 error_no="00201") 927 return 928 LOG.info("Need upload %s case" % len(upload_params)) 929 upload_suite = [] 930 for upload_param in upload_params: 931 case_id, result, error, start_time, end_time, report_path = \ 932 upload_param 933 case = {"caseid": case_id, "result": result, "error": error, 934 "start": start_time, "end": end_time, 935 "report": report_path} 936 LOG.info("Case info: %s", case) 937 upload_suite.append(case) 938 if Scheduler.proxy is not None: 939 Scheduler.proxy.upload_batch(upload_suite) 940 else: 941 LOG.debug("There is no proxy, can't upload module result") 942 943 @classmethod 944 def _get_upload_params(cls, result_file, request): 945 upload_params = [] 946 report_path = result_file 947 testsuites_element = DataHelper.parse_data_report(report_path) 948 start_time, end_time = cls._get_time(testsuites_element) 949 if request.get_test_type() == HostDrivenTestType.device_test: 950 for model_element in testsuites_element: 951 case_id = model_element.get(ReportConstant.name, "") 952 case_result, error = cls.get_script_result(model_element) 953 if error and len(error) > MAX_VISIBLE_LENGTH: 954 error = "$s..." % error[:MAX_VISIBLE_LENGTH] 955 upload_params.append( 956 (case_id, case_result, error, start_time, 957 end_time, request.config.report_path,)) 958 else: 959 for testsuite_element in testsuites_element: 960 if check_mode(ModeType.developer): 961 module_name = str(get_filename_extension( 962 report_path)[0]).split(".")[0] 963 else: 964 module_name = testsuite_element.get(ReportConstant.name, 965 "none") 966 for case_element in testsuite_element: 967 case_id = cls._get_case_id(case_element, module_name) 968 case_result, error = cls._get_case_result(case_element) 969 if error and len(error) > MAX_VISIBLE_LENGTH: 970 error = "%s..." % error[:MAX_VISIBLE_LENGTH] 971 if case_result == "Ignored": 972 LOG.info("Get upload params: %s result is ignored", 973 case_id) 974 continue 975 upload_params.append( 976 (case_id, case_result, error, start_time, 977 end_time, request.config.report_path,)) 978 return upload_params, start_time, end_time 979 980 @classmethod 981 def get_script_result(cls, model_element): 982 disabled = int(model_element.get(ReportConstant.disabled)) if \ 983 model_element.get(ReportConstant.disabled, "") else 0 984 failures = int(model_element.get(ReportConstant.failures)) if \ 985 model_element.get(ReportConstant.failures, "") else 0 986 errors = int(model_element.get(ReportConstant.errors)) if \ 987 model_element.get(ReportConstant.errors, "") else 0 988 unavailable = int(model_element.get(ReportConstant.unavailable)) if \ 989 model_element.get(ReportConstant.unavailable, "") else 0 990 if failures > 0 or errors > 0: 991 result = "Failed" 992 elif disabled > 0 or unavailable > 0: 993 result = "Unavailable" 994 else: 995 result = "Passed" 996 997 if result == "Passed": 998 return result, "" 999 if Scheduler.mode == ModeType.decc: 1000 result = "Failed" 1001 1002 error_msg = model_element.get(ReportConstant.message, "") 1003 if not error_msg and len(model_element) > 0: 1004 error_msg = model_element[0].get(ReportConstant.message, "") 1005 if not error_msg and len(model_element[0]) > 0: 1006 error_msg = model_element[0][0].get(ReportConstant.message, "") 1007 return result, error_msg 1008 1009 @classmethod 1010 def _get_case_id(cls, case_element, package_name): 1011 class_name = case_element.get(ReportConstant.class_name, "none") 1012 method_name = case_element.get(ReportConstant.name, "none") 1013 case_id = "{}#{}#{}#{}".format(Scheduler.task_name, package_name, 1014 class_name, method_name) 1015 return case_id 1016 1017 @classmethod 1018 def _get_case_result(cls, case_element): 1019 # get result 1020 case = Case() 1021 case.status = case_element.get(ReportConstant.status, "") 1022 case.result = case_element.get(ReportConstant.result, "") 1023 if case_element.get(ReportConstant.message, ""): 1024 case.message = case_element.get(ReportConstant.message) 1025 if len(case_element) > 0: 1026 if not case.result: 1027 case.result = ReportConstant.false 1028 case.message = case_element[0].get(ReportConstant.message) 1029 if case.is_passed(): 1030 result = "Passed" 1031 elif case.is_failed(): 1032 result = "Failed" 1033 elif case.is_blocked(): 1034 result = "Blocked" 1035 elif case.is_ignored(): 1036 result = "Ignored" 1037 else: 1038 result = "Unavailable" 1039 return result, case.message 1040 1041 @classmethod 1042 def _get_time(cls, testsuite_element): 1043 start_time = testsuite_element.get(ReportConstant.start_time, "") 1044 end_time = testsuite_element.get(ReportConstant.end_time, "") 1045 try: 1046 if start_time and end_time: 1047 start_time = int(time.mktime(time.strptime( 1048 start_time, ReportConstant.time_format)) * 1000) 1049 end_time = int(time.mktime(time.strptime( 1050 end_time, ReportConstant.time_format)) * 1000) 1051 else: 1052 timestamp = str(testsuite_element.get( 1053 ReportConstant.time_stamp, "")).replace("T", " ") 1054 cost_time = testsuite_element.get(ReportConstant.time, "") 1055 if timestamp and cost_time: 1056 try: 1057 end_time = int(time.mktime(time.strptime( 1058 timestamp, ReportConstant.time_format)) * 1000) 1059 except ArithmeticError as error: 1060 LOG.error("Get time error %s" % error) 1061 end_time = int(time.time() * 1000) 1062 start_time = int(end_time - float(cost_time) * 1000) 1063 else: 1064 current_time = int(time.time() * 1000) 1065 start_time, end_time = current_time, current_time 1066 except ArithmeticError as error: 1067 LOG.error("Get time error %s" % error) 1068 current_time = int(time.time() * 1000) 1069 start_time, end_time = current_time, current_time 1070 return start_time, end_time 1071 1072 @classmethod 1073 def upload_task_result(cls, task, error_message=""): 1074 if not Scheduler.task_name: 1075 LOG.info("No need upload summary report") 1076 return 1077 1078 summary_data_report = os.path.join(task.config.report_path, 1079 ReportConstant.summary_data_report) 1080 if not os.path.exists(summary_data_report): 1081 Scheduler.upload_unavailable_result(str( 1082 error_message) or "summary report not exists", 1083 task.config.report_path) 1084 return 1085 1086 task_element = ElementTree.parse(summary_data_report).getroot() 1087 start_time, end_time = cls._get_time(task_element) 1088 task_result = cls._get_task_result(task_element) 1089 error_msg = "" 1090 for child in task_element: 1091 if child.get(ReportConstant.message, ""): 1092 error_msg = "{}{}".format( 1093 error_msg, "%s;" % child.get(ReportConstant.message)) 1094 if error_msg: 1095 error_msg = error_msg[:-1] 1096 cls.upload_case_result((Scheduler.task_name, task_result, 1097 error_msg, start_time, end_time, 1098 task.config.report_path)) 1099 1100 @classmethod 1101 def _get_task_result(cls, task_element): 1102 failures = int(task_element.get(ReportConstant.failures, 0)) 1103 errors = int(task_element.get(ReportConstant.errors, 0)) 1104 disabled = int(task_element.get(ReportConstant.disabled, 0)) 1105 unavailable = int(task_element.get(ReportConstant.unavailable, 0)) 1106 if disabled > 0: 1107 task_result = "Blocked" 1108 elif errors > 0 or failures > 0: 1109 task_result = "Failed" 1110 elif unavailable > 0: 1111 task_result = "Unavailable" 1112 else: 1113 task_result = "Passed" 1114 return task_result 1115 1116 @classmethod 1117 def upload_unavailable_result(cls, error_msg, report_path=""): 1118 start_time = int(time.time() * 1000) 1119 Scheduler.upload_case_result((Scheduler.task_name, "Unavailable", 1120 error_msg, start_time, start_time, 1121 report_path)) 1122 1123 @classmethod 1124 def upload_report_end(cls): 1125 if getattr(cls, "tmp_json", None): 1126 os.remove(cls.tmp_json) 1127 del cls.tmp_json 1128 LOG.info("Upload report end") 1129 if Scheduler.proxy is not None: 1130 Scheduler.proxy.report_end() 1131 else: 1132 LOG.debug("There is no proxy, can't upload report end") 1133 1134 @classmethod 1135 def is_module_need_retry(cls, task, module_name): 1136 failed_flag = False 1137 if check_mode(ModeType.decc): 1138 from xdevice import SuiteReporter 1139 for module, failed in SuiteReporter.get_failed_case_list(): 1140 if module_name == module or str(module_name).split( 1141 ".")[0] == module: 1142 failed_flag = True 1143 break 1144 else: 1145 from xdevice import ResultReporter 1146 history_report_path = \ 1147 getattr(task.config, ConfigConst.history_report_path, "") 1148 params = ResultReporter.get_task_info_params(history_report_path) 1149 if params and params[ReportConst.unsuccessful_params]: 1150 if dict(params[ReportConst.unsuccessful_params]).get(module_name, []): 1151 failed_flag = True 1152 elif dict(params[ReportConst.unsuccessful_params]).get(str(module_name).split(".")[0], []): 1153 failed_flag = True 1154 return failed_flag 1155 1156 @classmethod 1157 def compare_spt_time(cls, kit_spt, device_spt): 1158 if not kit_spt or not device_spt: 1159 return False 1160 try: 1161 kit_time = str(kit_spt).split("-")[:2] 1162 device_time = str(device_spt).split("-")[:2] 1163 k_spt = datetime.datetime.strptime( 1164 "-".join(kit_time), "%Y-%m") 1165 d_spt = datetime.datetime.strptime("-".join(device_time), "%Y-%m") 1166 except ValueError as value_error: 1167 LOG.debug("Date format is error, %s" % value_error.args) 1168 return False 1169 month_interval = int(k_spt.month) - int(d_spt.month) 1170 year_interval = int(k_spt.year) - int(d_spt.year) 1171 LOG.debug("Kit spt (year=%s, month=%s), device spt (year=%s, month=%s)" 1172 % (k_spt.year, k_spt.month, d_spt.year, d_spt.month)) 1173 if year_interval < 0: 1174 return True 1175 if year_interval == 0 and month_interval in range(-11, 3): 1176 return True 1177 if year_interval == 1 and month_interval + 12 in (1, 2): 1178 return True 1179 1180 @classmethod 1181 def _parse_property_value(cls, property_name, driver_request, kit): 1182 test_args = copy.deepcopy( 1183 driver_request.config.get(ConfigConst.testargs, dict())) 1184 property_value = "" 1185 if ConfigConst.pass_through in test_args.keys(): 1186 import json 1187 pt_dict = json.loads(test_args.get(ConfigConst.pass_through, "")) 1188 property_value = pt_dict.get(property_name, None) 1189 elif property_name in test_args.keys: 1190 property_value = test_args.get(property_name, None) 1191 return property_value if property_value else \ 1192 kit.properties.get(property_name, None) 1193 1194 @classmethod 1195 def _calculate_device_options(cls, device_options, environment_config, 1196 options, test_source): 1197 # calculate difference 1198 diff_value = len(environment_config) - len(device_options) 1199 if device_options and diff_value == 0: 1200 return device_options 1201 1202 else: 1203 diff_value = diff_value if diff_value else 1 1204 if str(test_source.source_file).endswith(".bin"): 1205 device_option = DeviceSelectionOption( 1206 options, DeviceLabelType.ipcamera, test_source) 1207 else: 1208 device_option = DeviceSelectionOption( 1209 options, None, test_source) 1210 1211 device_option.source_file = \ 1212 test_source.source_file or test_source.source_string 1213 device_option.required_manager = "device" 1214 device_options.extend([device_option] * diff_value) 1215 LOG.debug("Assign device options and it's length is %s" 1216 % len(device_options)) 1217 return device_options 1218 1219 @classmethod 1220 def update_test_type_in_source(cls, key, value): 1221 LOG.debug("update test type dict in source") 1222 TestDictSource.test_type[key] = value 1223 1224 @classmethod 1225 def update_ext_type_in_source(cls, key, value): 1226 LOG.debug("update ext type dict in source") 1227 TestDictSource.exe_type[key] = value 1228 1229 @classmethod 1230 def _clear_test_dict_source(cls): 1231 TestDictSource.exe_type.clear() 1232 TestDictSource.test_type.clear() 1233 1234 @classmethod 1235 def _pre_component_test(cls, config): 1236 if not config.kits: 1237 return 1238 cur_kit = None 1239 for kit in config.kits: 1240 if kit.__class__.__name__ == CKit.component: 1241 cur_kit = kit 1242 break 1243 if not cur_kit: 1244 return 1245 get_white_list = getattr(cur_kit, "get_white_list", None) 1246 if not callable(get_white_list): 1247 return 1248 subsystems, parts = get_white_list() 1249 if not subsystems and not parts: 1250 return 1251 setattr(config, ConfigConst.component_base_kit, cur_kit) 1252 1253 @classmethod 1254 def component_task_setup(cls, task, module_name): 1255 component_kit = task.config.get(ConfigConst.component_base_kit, None) 1256 if not component_kit: 1257 # only -p -s .you do not care about the components that can be 1258 # supported. you only want to run the use cases of the current 1259 # component 1260 return 1261 LOG.debug("Start component task setup") 1262 _component_mapper = task.config.get(ConfigConst.component_mapper) 1263 _subsystem, _part = _component_mapper.get(module_name) 1264 1265 is_hit = False 1266 # find in cache. if not find, update cache 1267 cache_subsystem, cache_part = component_kit.get_cache() 1268 if _subsystem in cache_subsystem or _part in cache_subsystem: 1269 is_hit = True 1270 if not is_hit: 1271 env_manager = EnvironmentManager() 1272 for _, manager in env_manager.managers.items(): 1273 if getattr(manager, "devices_list", []): 1274 for device in manager.devices_list: 1275 component_kit.__setup__(device) 1276 cache_subsystem, cache_part = component_kit.get_cache() 1277 if _subsystem in cache_subsystem or _part in cache_subsystem: 1278 is_hit = True 1279 if not is_hit: 1280 LOG.warning("%s are skipped, no suitable component found. " 1281 "Require subsystem=%s part=%s, no device match this" 1282 % (module_name, _subsystem, _part)) 1283