1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import copy 20import os 21import shutil 22import threading 23import time 24from concurrent.futures import ThreadPoolExecutor 25from concurrent.futures import wait 26 27from _core.constants import ModeType 28from _core.constants import ConfigConst 29from _core.constants import ReportConst 30from _core.error import ErrorMessage 31from _core.executor.request import Request 32from _core.logger import platform_logger 33from _core.logger import redirect_driver_log_begin 34from _core.logger import redirect_driver_log_end 35from _core.plugin import Config 36from _core.plugin import get_plugin 37from _core.plugin import Plugin 38from _core.utils import calculate_elapsed_time 39from _core.utils import check_mode 40from _core.utils import get_file_absolute_path 41from _core.utils import get_kit_instances 42from _core.utils import check_device_name 43from _core.utils import check_device_env_index 44from _core.utils import get_repeat_round 45from _core.exception import ParamError 46from _core.exception import ExecuteTerminate 47from _core.exception import DeviceError 48from _core.exception import LiteDeviceError 49from _core.report.reporter_helper import ReportConstant 50from _core.report.reporter_helper import DataHelper 51from _core.report.reporter_helper import Suite 52from _core.report.reporter_helper import Case 53from _core.context.center import Context 54from _core.context.handler import handle_repeat_result 55from _core.context.handler import update_report_xml 56from _core.context.upload import Uploader 57from _core.testkit.json_parser import JsonParser 58 59LOG = platform_logger("Concurrent") 60 61 62class Concurrent: 63 @classmethod 64 def executor_callback(cls, worker): 65 worker_exception = worker.exception() 66 if worker_exception: 67 LOG.error("Worker return exception: {}".format(worker_exception)) 68 69 @classmethod 70 def concurrent_execute(cls, func, params_list, max_size=8): 71 """ 72 Provider the ability to execute target function concurrently 73 :param func: target function name 74 :param params_list: the list of params in these target functions 75 :param max_size: the max size of thread you wanted in thread pool 76 :return: 77 """ 78 with ThreadPoolExecutor(max_size) as executor: 79 future_params = dict() 80 for params in params_list: 81 future = executor.submit(func, *params) 82 future_params.update({future: params}) 83 future.add_done_callback(cls.executor_callback) 84 wait(future_params) # wait all function complete 85 result_list = [] 86 for future in future_params: 87 result_list.append((future.result(), future_params[future])) 88 return result_list 89 90 91class DriversThread(threading.Thread): 92 def __init__(self, test_driver, task, environment, message_queue): 93 super().__init__() 94 self.test_driver = test_driver 95 self.listeners = None 96 self.task = task 97 self.environment = environment 98 self.message_queue = message_queue 99 self.error_message = "" 100 self.module_config_kits = None 101 self.repeat = 1 102 self.repeat_round = 1 103 self.start_time = time.time() 104 105 def set_listeners(self, listeners): 106 self.listeners = listeners 107 if self.environment is None: 108 return 109 110 for listener in listeners: 111 listener.device_sn = self.environment.devices[0].device_sn 112 113 def get_driver_log_file(self, test): 114 self.repeat = Context.get_scheduler().get_repeat_index() 115 self.repeat_round = get_repeat_round(test.unique_id) 116 round_folder = f"round{self.repeat_round}" if self.repeat > 1 else "" 117 log_file = os.path.join( 118 self.task.config.log_path, round_folder, 119 test.source.module_name, ReportConstant.module_run_log) 120 return log_file 121 122 def run(self): 123 driver, test = None, None 124 if self.test_driver and Context.is_executing(): 125 driver, test = self.test_driver 126 if driver is None or test is None: 127 return 128 redirect_driver_log_begin(self.ident, self.get_driver_log_file(test)) 129 LOG.debug("Thread %s start" % self.name) 130 execute_message = ExecuteMessage('', self.environment, self.test_driver, self.name) 131 try: 132 # construct params 133 driver_request = self._get_driver_request(test, execute_message) 134 if driver_request is None: 135 return 136 # setup device 137 self._do_task_setup(driver_request) 138 # driver execute 139 self.reset_device(driver_request.config) 140 driver.__execute__(driver_request) 141 except Exception as exception: 142 error_no = getattr(exception, "error_no", "00000") 143 if self.environment is None: 144 LOG.exception("Exception: %s", exception, exc_info=False, 145 error_no=error_no) 146 else: 147 LOG.exception( 148 "Device: %s, exception: %s" % ( 149 self.environment.__get_serial__(), exception), 150 exc_info=False, error_no=error_no) 151 self.error_message = str(exception) 152 153 finally: 154 self._do_commom_module_kit_teardown() 155 self._handle_finally(driver, test, execute_message) 156 redirect_driver_log_end(self.ident) 157 158 @staticmethod 159 def reset_device(config): 160 if getattr(config, "reboot_per_module", False): 161 for device in config.environment.devices: 162 device.reboot() 163 164 def _handle_finally(self, driver, test, execute_message): 165 source_content = test.source.source_file or test.source.source_string 166 end_time = time.time() 167 LOG.info("Executed: %s, Execution Time: %s" % ( 168 source_content, calculate_elapsed_time(self.start_time, end_time))) 169 170 # inherit history report under retry mode 171 if driver and test: 172 execute_result = driver.__result__() 173 # move result file to round folder when repeat > 1 174 if self.repeat > 1: 175 execute_result = handle_repeat_result( 176 execute_result, self.task.config.report_path, 177 round_folder=f"round{self.repeat_round}") 178 LOG.debug("Execute result: %s" % execute_result) 179 180 # update result xml 181 update_props = { 182 ReportConstant.start_time: time.strftime( 183 ReportConstant.time_format, time.localtime(int(self.start_time))), 184 ReportConstant.end_time: time.strftime( 185 ReportConstant.time_format, time.localtime(int(end_time))), 186 ReportConstant.repeat: str(self.repeat), 187 ReportConstant.round: str(self.repeat_round), 188 ReportConstant.test_type: test.source.test_type 189 } 190 if self.environment is not None: 191 update_props.update({ReportConstant.devices: self.environment.get_description()}) 192 update_report_xml(execute_result, update_props) 193 194 if getattr(self.task.config, "history_report_path", ""): 195 execute_result = self._inherit_execute_result(execute_result, test) 196 execute_message.set_result(execute_result) 197 198 # set execute state 199 if self.error_message: 200 execute_message.set_state(ExecuteMessage.DEVICE_ERROR) 201 else: 202 execute_message.set_state(ExecuteMessage.DEVICE_FINISH) 203 204 # free environment 205 if self.environment: 206 LOG.debug("Thread %s free environment", execute_message.get_thread_name()) 207 Context.get_scheduler().__free_environment__(execute_message.get_environment()) 208 209 LOG.debug("Thread %s put result", self.name) 210 self.message_queue.put(execute_message) 211 LOG.info("Thread %s end", self.name) 212 213 def _do_common_module_kit_setup(self, driver_request): 214 for device in self.environment.devices: 215 setattr(device, ConfigConst.common_module_kits, []) 216 217 for kit in self.module_config_kits: 218 run_flag = False 219 for device in self.environment.devices: 220 if not Context.is_executing(): 221 raise ExecuteTerminate() 222 if not check_device_env_index(device, kit): 223 continue 224 if check_device_name(device, kit): 225 run_flag = True 226 kit_copy = copy.deepcopy(kit) 227 module_kits = getattr(device, ConfigConst.common_module_kits) 228 module_kits.append(kit_copy) 229 kit_copy.__setup__(device, request=driver_request) 230 if not run_flag: 231 err_msg = ErrorMessage.Common.Code_0101004.format(kit.__class__.__name__) 232 LOG.error(err_msg) 233 raise ParamError(err_msg) 234 235 def _do_commom_module_kit_teardown(self): 236 try: 237 for device in self.environment.devices: 238 for kit in getattr(device, ConfigConst.common_module_kits, []): 239 if check_device_name(device, kit, step="teardown"): 240 kit.__teardown__(device) 241 setattr(device, ConfigConst.common_module_kits, []) 242 except Exception as e: 243 LOG.error("common module kit teardown error: {}".format(e)) 244 245 def _do_task_setup(self, driver_request): 246 if check_mode(ModeType.decc) or getattr( 247 driver_request.config, ConfigConst.check_device, False): 248 return 249 250 if self.environment is None: 251 return 252 253 if (hasattr(driver_request.config, ConfigConst.module_config) and 254 getattr(driver_request.config, ConfigConst.module_config, None)): 255 module_config_path = getattr(driver_request.config, ConfigConst.module_config, None) 256 LOG.debug("Common module config path: {}".format(module_config_path)) 257 from xdevice import Variables 258 config_path = get_file_absolute_path(module_config_path, 259 [os.path.join(Variables.exec_dir, "config")]) 260 json_config = JsonParser(config_path) 261 self.module_config_kits = get_kit_instances(json_config, 262 driver_request.config.resource_path, 263 driver_request.config.testcases_path) 264 self._do_common_module_kit_setup(driver_request) 265 266 for device in self.environment.devices: 267 if not getattr(device, ConfigConst.need_kit_setup, True): 268 LOG.debug("Device %s need kit setup is false" % device) 269 continue 270 271 # do task setup for device 272 kits_copy = copy.deepcopy(self.task.config.kits) 273 setattr(device, ConfigConst.task_kits, kits_copy) 274 for kit in getattr(device, ConfigConst.task_kits, []): 275 if not Context.is_executing(): 276 break 277 try: 278 kit.__setup__(device, request=driver_request) 279 except (ParamError, ExecuteTerminate, DeviceError, 280 LiteDeviceError, ValueError, TypeError, 281 SyntaxError, AttributeError) as exception: 282 error_no = getattr(exception, "error_no", "00000") 283 LOG.exception( 284 "Task setup device: %s, exception: %s" % ( 285 self.environment.__get_serial__(), 286 exception), exc_info=False, error_no=error_no) 287 LOG.debug("Set device %s need kit setup to false" % device) 288 setattr(device, ConfigConst.need_kit_setup, False) 289 290 # set product_info to self.task 291 if getattr(driver_request, ConfigConst.product_info, "") and not \ 292 getattr(self.task, ConfigConst.product_info, ""): 293 product_info = getattr(driver_request, ConfigConst.product_info) 294 if not isinstance(product_info, dict): 295 LOG.warning("Product info should be dict, %s", 296 product_info) 297 return 298 setattr(self.task, ConfigConst.product_info, product_info) 299 300 def _get_driver_request(self, root_desc, execute_message): 301 config = Config() 302 config.update(copy.deepcopy(self.task.config).__dict__) 303 config.environment = self.environment 304 if getattr(config, "history_report_path", ""): 305 # modify config.testargs 306 history_report_path = getattr(config, "history_report_path", "") 307 module_name = root_desc.source.module_name 308 unpassed_test_params = self._get_unpassed_test_params( 309 history_report_path, module_name) 310 if not unpassed_test_params: 311 LOG.info("%s all test cases are passed, no need retry", module_name) 312 driver_request = Request(self.name, root_desc, self.listeners, config) 313 execute_message.set_request(driver_request) 314 return None 315 if unpassed_test_params[0] != module_name and \ 316 unpassed_test_params[0] != str(module_name).split(".")[0]: 317 test_args = getattr(config, "testargs", {}) 318 test_params = [] 319 for unpassed_test_param in unpassed_test_params: 320 if unpassed_test_param not in test_params: 321 test_params.append(unpassed_test_param) 322 test_args["test"] = test_params 323 if "class" in test_args.keys(): 324 test_args.pop("class") 325 setattr(config, "testargs", test_args) 326 if getattr(config, "tf_suite", ""): 327 if root_desc.source.module_name in config.tf_suite.keys(): 328 config.tf_suite = config.tf_suite.get( 329 root_desc.source.module_name) 330 else: 331 config.tf_suite = dict() 332 for listener in self.listeners: 333 LOG.debug("Thread %s, listener %s" % (self.name, listener)) 334 driver_request = Request(self.name, root_desc, self.listeners, config) 335 execute_message.set_request(driver_request) 336 return driver_request 337 338 @classmethod 339 def _get_unpassed_test_params(cls, history_report_path, module_name): 340 unpassed_test_params = [] 341 from _core.report.result_reporter import ResultReporter 342 params = ResultReporter.get_task_info_params(history_report_path) 343 if not params: 344 return unpassed_test_params 345 failed_list = [] 346 try: 347 from devicetest.agent.decc import Handler 348 if Handler.DAV.retry_select: 349 for i in Handler.DAV.case_id_list: 350 failed_list.append(i + "#" + i) 351 else: 352 failed_list = params[ReportConst.unsuccessful_params].get(module_name, []) 353 except Exception: 354 failed_list = params[ReportConst.unsuccessful_params].get(module_name, []) 355 if not failed_list: 356 failed_list = params[ReportConst.unsuccessful_params].get(str(module_name).split(".")[0], []) 357 unpassed_test_params.extend(failed_list) 358 LOG.debug("Get unpassed test params %s", unpassed_test_params) 359 return unpassed_test_params 360 361 @classmethod 362 def _append_unpassed_test_param(cls, history_report_file, unpassed_test_params): 363 testsuites_element = DataHelper.parse_data_report(history_report_file) 364 for testsuite_element in testsuites_element: 365 suite_name = testsuite_element.get("name", "") 366 suite = Suite() 367 suite.set_cases(testsuite_element) 368 for case in suite.cases: 369 if case.is_passed(): 370 continue 371 unpassed_test_param = "{}#{}#{}".format( 372 suite_name, case.classname, case.name) 373 unpassed_test_params.append(unpassed_test_param) 374 375 def _inherit_execute_result(self, execute_result, root_desc): 376 module_name = root_desc.source.module_name 377 execute_result_name = "%s.xml" % module_name 378 history_execute_result = self._get_history_execute_result( 379 execute_result_name) 380 if not history_execute_result: 381 LOG.warning("%s no history execute result exists", 382 execute_result_name) 383 return execute_result 384 385 if not check_mode(ModeType.decc): 386 if not os.path.exists(execute_result): 387 result_dir = \ 388 os.path.join(self.task.config.report_path, "result") 389 os.makedirs(result_dir, exist_ok=True) 390 target_execute_result = os.path.join(result_dir, 391 execute_result_name) 392 shutil.copyfile(history_execute_result, target_execute_result) 393 LOG.info("Copy %s to %s" % (history_execute_result, 394 target_execute_result)) 395 return target_execute_result 396 397 real_execute_result = self._get_real_execute_result(execute_result) 398 399 # inherit history execute result 400 testsuites_element = DataHelper.parse_data_report(real_execute_result) 401 if self._is_empty_report(testsuites_element): 402 if check_mode(ModeType.decc): 403 LOG.info("Empty report no need to inherit history execute" 404 " result") 405 else: 406 LOG.info("Empty report '%s' no need to inherit history execute" 407 " result", history_execute_result) 408 return execute_result 409 410 real_history_execute_result = self._get_real_history_execute_result( 411 history_execute_result, module_name) 412 413 history_testsuites_element = DataHelper.parse_data_report( 414 real_history_execute_result) 415 if self._is_empty_report(history_testsuites_element): 416 LOG.info("History report '%s' is empty", history_execute_result) 417 return execute_result 418 if check_mode(ModeType.decc): 419 LOG.info("Inherit history execute result") 420 else: 421 LOG.info("Inherit history execute result: %s", 422 history_execute_result) 423 self._inherit_element(history_testsuites_element, testsuites_element) 424 425 if check_mode(ModeType.decc): 426 from xdevice import SuiteReporter 427 SuiteReporter.append_report_result( 428 (execute_result, DataHelper.to_string(testsuites_element))) 429 else: 430 # generate inherit execute result 431 DataHelper.generate_report(testsuites_element, execute_result) 432 return execute_result 433 434 def _inherit_element(self, history_testsuites_element, testsuites_element): 435 for history_testsuite_element in history_testsuites_element: 436 history_testsuite_name = history_testsuite_element.get("name", "") 437 target_testsuite_element = None 438 for testsuite_element in testsuites_element: 439 if history_testsuite_name == testsuite_element.get("name", ""): 440 target_testsuite_element = testsuite_element 441 break 442 443 if target_testsuite_element is None: 444 testsuites_element.append(history_testsuite_element) 445 inherited_test = int(testsuites_element.get( 446 ReportConstant.tests, 0)) + int( 447 history_testsuite_element.get(ReportConstant.tests, 0)) 448 testsuites_element.set(ReportConstant.tests, 449 str(inherited_test)) 450 continue 451 452 pass_num = 0 453 for history_testcase_element in history_testsuite_element: 454 if self._check_testcase_pass(history_testcase_element): 455 target_testsuite_element.append(history_testcase_element) 456 pass_num += 1 457 458 inherited_test = int(target_testsuite_element.get( 459 ReportConstant.tests, 0)) + pass_num 460 target_testsuite_element.set(ReportConstant.tests, 461 str(inherited_test)) 462 inherited_test = int(testsuites_element.get( 463 ReportConstant.tests, 0)) + pass_num 464 testsuites_element.set(ReportConstant.tests, str(inherited_test)) 465 466 def _get_history_execute_result(self, execute_result_name): 467 if execute_result_name.endswith(".xml"): 468 execute_result_name = execute_result_name[:-4] 469 history_execute_result = \ 470 self._get_data_report_from_record(execute_result_name) 471 if history_execute_result: 472 return history_execute_result 473 for root_dir, _, files in os.walk( 474 self.task.config.history_report_path): 475 for result_file in files: 476 if result_file.endswith(execute_result_name): 477 history_execute_result = os.path.abspath( 478 os.path.join(root_dir, result_file)) 479 return history_execute_result 480 481 @classmethod 482 def _check_testcase_pass(cls, history_testcase_element): 483 case = Case() 484 case.result = history_testcase_element.get(ReportConstant.result, "") 485 case.status = history_testcase_element.get(ReportConstant.status, "") 486 case.message = history_testcase_element.get(ReportConstant.message, "") 487 if len(history_testcase_element) > 0: 488 if not case.result: 489 case.result = ReportConstant.false 490 case.message = history_testcase_element[0].get( 491 ReportConstant.message) 492 493 return case.is_passed() 494 495 @classmethod 496 def _is_empty_report(cls, testsuites_element): 497 if len(testsuites_element) < 1: 498 return True 499 if len(testsuites_element) >= 2: 500 return False 501 502 if int(testsuites_element[0].get(ReportConstant.unavailable, 0)) > 0: 503 return True 504 return False 505 506 def _get_data_report_from_record(self, execute_result_name): 507 history_report_path = \ 508 getattr(self.task.config, "history_report_path", "") 509 if history_report_path: 510 from _core.report.result_reporter import ResultReporter 511 params = ResultReporter.get_task_info_params(history_report_path) 512 if params: 513 report_data_dict = dict(params[ReportConst.data_reports]) 514 if execute_result_name in report_data_dict.keys(): 515 return report_data_dict.get(execute_result_name) 516 elif execute_result_name.split(".")[0] in \ 517 report_data_dict.keys(): 518 return report_data_dict.get( 519 execute_result_name.split(".")[0]) 520 return "" 521 522 @classmethod 523 def _get_real_execute_result(cls, execute_result): 524 from xdevice import SuiteReporter 525 LOG.debug("Get real execute result length is: %s" % 526 len(SuiteReporter.get_report_result())) 527 if check_mode(ModeType.decc): 528 for suite_report, report_result in \ 529 SuiteReporter.get_report_result(): 530 if os.path.splitext(suite_report)[0] == \ 531 os.path.splitext(execute_result)[0]: 532 return report_result 533 return "" 534 else: 535 return execute_result 536 537 @classmethod 538 def _get_real_history_execute_result(cls, history_execute_result, 539 module_name): 540 from xdevice import SuiteReporter 541 LOG.debug("Get real history execute result: %s" % 542 SuiteReporter.history_report_result) 543 if check_mode(ModeType.decc): 544 virtual_report_path, report_result = SuiteReporter. \ 545 get_history_result_by_module(module_name) 546 return report_result 547 else: 548 return history_execute_result 549 550 551class DriversDryRunThread(threading.Thread): 552 def __init__(self, test_driver, task, environment, message_queue): 553 super().__init__() 554 self.test_driver = test_driver 555 self.listeners = None 556 self.task = task 557 self.environment = environment 558 self.message_queue = message_queue 559 self.error_message = "" 560 561 def run(self): 562 LOG.debug("Thread %s start" % self.name) 563 start_time = time.time() 564 execute_message = ExecuteMessage('', self.environment, self.test_driver, self.name) 565 driver, test = None, None 566 try: 567 if self.test_driver and Context.is_executing(): 568 # construct params 569 driver, test = self.test_driver 570 driver_request = self._get_driver_request(test, 571 execute_message) 572 if driver_request is None: 573 return 574 575 # setup device 576 self._do_task_setup(driver_request) 577 578 # driver execute 579 self.reset_device(driver_request.config) 580 driver.__dry_run_execute__(driver_request) 581 582 except Exception as exception: 583 error_no = getattr(exception, "error_no", "00000") 584 if self.environment is None: 585 LOG.exception("Exception: %s", exception, exc_info=False, 586 error_no=error_no) 587 else: 588 LOG.exception( 589 "Device: %s, exception: %s" % ( 590 self.environment.__get_serial__(), exception), 591 exc_info=False, error_no=error_no) 592 self.error_message = str(exception) 593 594 finally: 595 self._handle_finally(driver, execute_message, start_time, test) 596 597 @staticmethod 598 def reset_device(config): 599 if getattr(config, "reboot_per_module", False): 600 for device in config.environment.devices: 601 device.reboot() 602 603 def _handle_finally(self, driver, execute_message, start_time, test): 604 source_content = (self.test_driver[1].source.source_file 605 or self.test_driver[1].source.source_string) 606 LOG.info("Executed: %s, Execution Time: %s" % ( 607 source_content, calculate_elapsed_time(start_time, time.time()))) 608 609 # set execute state 610 if self.error_message: 611 execute_message.set_state(ExecuteMessage.DEVICE_ERROR) 612 else: 613 execute_message.set_state(ExecuteMessage.DEVICE_FINISH) 614 615 # free environment 616 if self.environment: 617 LOG.debug("Thread %s free environment", execute_message.get_thread_name()) 618 Context.get_scheduler().__free_environment__(execute_message.get_environment()) 619 620 LOG.debug("Thread %s put result", self.name) 621 self.message_queue.put(execute_message) 622 623 def _do_task_setup(self, driver_request): 624 if check_mode(ModeType.decc) or getattr( 625 driver_request.config, ConfigConst.check_device, False): 626 return 627 628 if self.environment is None: 629 return 630 631 for device in self.environment.devices: 632 if not getattr(device, ConfigConst.need_kit_setup, True): 633 LOG.debug("Device %s need kit setup is false" % device) 634 continue 635 636 # do task setup for device 637 kits_copy = copy.deepcopy(self.task.config.kits) 638 setattr(device, ConfigConst.task_kits, kits_copy) 639 for kit in getattr(device, ConfigConst.task_kits, []): 640 if not Context.is_executing(): 641 break 642 try: 643 kit.__setup__(device, request=driver_request) 644 except (ParamError, ExecuteTerminate, DeviceError, 645 LiteDeviceError, ValueError, TypeError, 646 SyntaxError, AttributeError) as exception: 647 error_no = getattr(exception, "error_no", "00000") 648 LOG.exception( 649 "Task setup device: %s, exception: %s" % ( 650 self.environment.__get_serial__(), 651 exception), exc_info=False, error_no=error_no) 652 LOG.debug("Set device %s need kit setup to false" % device) 653 setattr(device, ConfigConst.need_kit_setup, False) 654 655 # set product_info to self.task 656 if getattr(driver_request, ConfigConst.product_info, "") and not \ 657 getattr(self.task, ConfigConst.product_info, ""): 658 product_info = getattr(driver_request, ConfigConst.product_info) 659 if not isinstance(product_info, dict): 660 LOG.warning("Product info should be dict, %s", 661 product_info) 662 return 663 setattr(self.task, ConfigConst.product_info, product_info) 664 665 def _get_driver_request(self, root_desc, execute_message): 666 config = Config() 667 config.update(copy.deepcopy(self.task.config).__dict__) 668 config.environment = self.environment 669 if self.listeners: 670 for listener in self.listeners: 671 LOG.debug("Thread %s, listener %s" % (self.name, listener)) 672 driver_request = Request(self.name, root_desc, self.listeners, config) 673 execute_message.set_request(driver_request) 674 return driver_request 675 676 677class QueueMonitorThread(threading.Thread): 678 679 def __init__(self, message_queue, current_driver_threads, test_drivers): 680 super().__init__() 681 self.message_queue = message_queue 682 self.current_driver_threads = current_driver_threads 683 self.test_drivers = test_drivers 684 685 def check_current_thread_status(self): 686 for tid_key in self.current_driver_threads.keys(): 687 if self.current_driver_threads[tid_key].is_alive(): 688 LOG.debug("Running thread is alive, thread is {}.".format(tid_key)) 689 else: 690 # if error need free thread environment 691 LOG.debug("Running thread is dead, thread is {}".format(tid_key)) 692 693 def run(self): 694 LOG.debug("Queue monitor thread start") 695 while self.test_drivers or self.current_driver_threads: 696 if not self.current_driver_threads: 697 time.sleep(3) 698 continue 699 execute_message = self.message_queue.get() 700 701 self.current_driver_threads.pop(execute_message.get_thread_name()) 702 703 if execute_message.get_state() == ExecuteMessage.DEVICE_FINISH: 704 LOG.debug("Thread %s execute finished" % execute_message.get_thread_name()) 705 elif execute_message.get_state() == ExecuteMessage.DEVICE_ERROR: 706 LOG.debug("Thread %s execute error" % execute_message.get_thread_name()) 707 Uploader.upload_module_result(execute_message) 708 709 LOG.debug("Queue monitor thread end") 710 if not Context.is_executing(): 711 LOG.info("Terminate success") 712 Context.get_scheduler().terminate_result.put("terminate success") 713 714 715class ExecuteMessage: 716 DEVICE_RUN = 'device_run' 717 DEVICE_FINISH = 'device_finish' 718 DEVICE_ERROR = 'device_error' 719 720 def __init__(self, state, environment, drivers, thread_name): 721 self.state = state 722 self.environment = environment 723 self.drivers = drivers 724 self.thread_name = thread_name 725 self.request = None 726 self.result = None 727 728 def set_state(self, state): 729 self.state = state 730 731 def get_state(self): 732 return self.state 733 734 def set_request(self, request): 735 self.request = request 736 737 def get_request(self): 738 return self.request 739 740 def set_result(self, result): 741 self.result = result 742 743 def get_result(self): 744 return self.result 745 746 def get_environment(self): 747 return self.environment 748 749 def get_thread_name(self): 750 return self.thread_name 751 752 def get_drivers(self): 753 return self.drivers 754 755 756class ModuleThread(DriversThread): 757 758 def __init__(self, test_driver, task, environment, message_queue, lock): 759 super().__init__(test_driver, task, environment, message_queue) 760 self.lock = lock 761 762 def run(self): 763 driver, test = None, None 764 if self.test_driver and Context.is_executing(): 765 driver, test = self.test_driver 766 if driver is None or test is None: 767 return 768 redirect_driver_log_begin(self.ident, self.get_driver_log_file(test)) 769 LOG.debug("Thread %s start" % self.name) 770 execute_message = ExecuteMessage('', self.environment, self.test_driver, self.name) 771 driver, test = None, None 772 try: 773 # construct params 774 driver, test = self.test_driver 775 driver_request = self._get_driver_request(test, 776 execute_message) 777 if driver_request is None: 778 return 779 780 # setup device 781 self._do_task_setup(driver_request) 782 783 # do common kit setup 784 self._do_common_kit_setup(self.environment, self.task, self.test_driver) 785 786 # driver execute 787 self.reset_device(driver_request.config) 788 driver.__execute__(driver_request) 789 except Exception as exception: 790 error_no = getattr(exception, "error_no", "00000") 791 if self.environment is None: 792 LOG.exception("Exception: %s", exception, exc_info=False, 793 error_no=error_no) 794 else: 795 LOG.exception( 796 "Device: %s, exception: %s" % ( 797 self.environment.__get_serial__(), exception), 798 exc_info=False, error_no=error_no) 799 self.error_message = str(exception) 800 801 finally: 802 # do common kit teardown 803 self.__do_common_kit_teardown() 804 self._handle_finally(driver, test, execute_message) 805 redirect_driver_log_end(self.ident) 806 807 def _do_common_kit_setup(self, environment, task, test_driver): 808 LOG.info("Do common setup kit") 809 common_kits = task.root.common_kits 810 module_subsystem = test_driver[1].source.module_subsystem 811 try: 812 LOG.info("lock acquire") 813 self.lock.acquire() 814 task.root.task_list.update({module_subsystem: task.root.task_list.get(module_subsystem) - 1}) 815 except Exception as e: 816 LOG.error(e) 817 finally: 818 self.lock.release() 819 LOG.info("lock release") 820 kits = common_kits.get(module_subsystem, None) 821 if kits: 822 new_kits = self.get_kit_instances(kits, task.config.resource_path, task.config.testcases_path) 823 LOG.info(new_kits) 824 for device in environment.devices: 825 if not getattr(device, "current_subsystem_kit", None): 826 setattr(device, "current_subsystem_kit", module_subsystem) 827 setattr(device, "common_kits", new_kits) 828 self.__do_common_kit_setup(device) 829 elif getattr(device, "current_subsystem_kit") != module_subsystem: 830 self.__do_common_kit_teardown() 831 setattr(device, "current_subsystem_kit", module_subsystem) 832 setattr(device, "common_kits", new_kits) 833 self.__do_common_kit_setup(device) 834 835 def __do_common_kit_teardown(self): 836 for device in self.environment.devices: 837 module_subsystem = getattr(device, "current_subsystem_kit", None) 838 if module_subsystem: 839 try: 840 LOG.info("lock acquire") 841 self.lock.acquire() 842 LOG.info(self.task.root.task_list.get(module_subsystem)) 843 if self.task.root.task_list.get(module_subsystem) <= 0: 844 LOG.info("do common kit teardown") 845 for kit in getattr(device, ConfigConst.common_kits, []): 846 kit.__teardown__(device) 847 setattr(device, ConfigConst.common_kits, []) 848 setattr(device, "current_subsystem_kit", None) 849 except Exception as e: 850 LOG.error(e) 851 finally: 852 self.lock.release() 853 LOG.info("lock release") 854 855 @staticmethod 856 def __do_common_kit_setup(device): 857 for kit in getattr(device, ConfigConst.common_kits, []): 858 if not Context.is_executing(): 859 raise ExecuteTerminate() 860 kit.__setup__(device, request=None) 861 862 @staticmethod 863 def get_kit_instances(kits, resource_path, testcases_path): 864 # get kit instances 865 kit_instances = [] 866 for kit in kits: 867 kit["paths"] = [resource_path, testcases_path] 868 kit_type = kit.get("type", "") 869 device_name = kit.get("device_name", None) 870 if get_plugin(plugin_type=Plugin.TEST_KIT, plugin_id=kit_type): 871 test_kit = \ 872 get_plugin(plugin_type=Plugin.TEST_KIT, plugin_id=kit_type)[0] 873 test_kit_instance = test_kit.__class__() 874 test_kit_instance.__check_config__(kit) 875 setattr(test_kit_instance, "device_name", device_name) 876 kit_instances.append(test_kit_instance) 877 else: 878 raise ParamError(ErrorMessage.Common.Code_0101003.format(kit_type)) 879 return kit_instances 880