1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import copy 20import os 21import shutil 22import threading 23import time 24from ast import literal_eval 25from concurrent.futures import ThreadPoolExecutor 26from concurrent.futures import wait 27from xml.etree import ElementTree 28 29from _core.constants import ConfigConst 30from _core.constants import FilePermission 31from _core.constants import ModeType 32from _core.constants import ReportConst 33from _core.executor.request import Request 34from _core.logger import platform_logger 35from _core.logger import redirect_driver_log_begin 36from _core.logger import redirect_driver_log_end 37from _core.plugin import Config 38from _core.utils import calculate_elapsed_time 39from _core.utils import get_instance_name 40from _core.utils import check_mode 41from _core.utils import get_file_absolute_path 42from _core.utils import get_kit_instances 43from _core.utils import check_device_name 44from _core.utils import check_device_env_index 45from _core.exception import ParamError 46from _core.exception import ExecuteTerminate 47from _core.exception import DeviceError 48from _core.exception import LiteDeviceError 49from _core.report.reporter_helper import ReportConstant 50from _core.report.reporter_helper import DataHelper 51from _core.report.reporter_helper import Suite 52from _core.report.reporter_helper import Case 53from _core.testkit.json_parser import JsonParser 54 55LOG = platform_logger("Concurrent") 56 57 58class Concurrent: 59 @classmethod 60 def executor_callback(cls, worker): 61 worker_exception = worker.exception() 62 if worker_exception: 63 LOG.error("Worker return exception: {}".format(worker_exception)) 64 65 @classmethod 66 def concurrent_execute(cls, func, params_list, max_size=8): 67 """ 68 Provider the ability to execute target function concurrently 69 :param func: target function name 70 :param params_list: the list of params in these target functions 71 :param max_size: the max size of thread you wanted in thread pool 72 :return: 73 """ 74 with ThreadPoolExecutor(max_size) as executor: 75 future_params = dict() 76 for params in params_list: 77 future = executor.submit(func, *params) 78 future_params.update({future: params}) 79 future.add_done_callback(cls.executor_callback) 80 wait(future_params) # wait all function complete 81 result_list = [] 82 for future in future_params: 83 result_list.append((future.result(), future_params[future])) 84 return result_list 85 86 87class DriversThread(threading.Thread): 88 def __init__(self, test_driver, task, environment, message_queue): 89 threading.Thread.__init__(self) 90 self.test_driver = test_driver 91 self.listeners = None 92 self.task = task 93 self.environment = environment 94 self.message_queue = message_queue 95 self.thread_id = None 96 self.error_message = "" 97 self.module_config_kits = None 98 self.start_time = time.time() 99 100 def set_listeners(self, listeners): 101 self.listeners = listeners 102 if self.environment is None: 103 return 104 105 for listener in listeners: 106 listener.device_sn = self.environment.devices[0].device_sn 107 108 def set_thread_id(self, thread_id): 109 self.thread_id = thread_id 110 111 def get_driver_log_file(self, test): 112 log_file = os.path.join( 113 self.task.config.log_path, 114 test.source.module_name, ReportConstant.module_run_log) 115 return log_file 116 117 def run(self): 118 from xdevice import Scheduler 119 driver, test = None, None 120 if self.test_driver and Scheduler.is_execute: 121 driver, test = self.test_driver 122 if driver is None or test is None: 123 return 124 redirect_driver_log_begin(self.ident, self.get_driver_log_file(test)) 125 LOG.debug("Thread id: %s start" % self.thread_id) 126 execute_message = ExecuteMessage( 127 '', self.environment, self.test_driver, self.thread_id) 128 try: 129 # construct params 130 driver, test = self.test_driver 131 driver_request = self._get_driver_request(test, execute_message) 132 if driver_request is None: 133 return 134 # setup device 135 self._do_task_setup(driver_request) 136 # driver execute 137 self.reset_device(driver_request.config) 138 driver.__execute__(driver_request) 139 except Exception as exception: 140 error_no = getattr(exception, "error_no", "00000") 141 if self.environment is None: 142 LOG.exception("Exception: %s", exception, exc_info=False, 143 error_no=error_no) 144 else: 145 LOG.exception( 146 "Device: %s, exception: %s" % ( 147 self.environment.__get_serial__(), exception), 148 exc_info=False, error_no=error_no) 149 self.error_message = "{}: {}".format( 150 get_instance_name(exception), str(exception)) 151 152 finally: 153 self._do_common_module_kit_teardown() 154 self._handle_finally(driver, test, execute_message) 155 redirect_driver_log_end(self.ident) 156 157 @staticmethod 158 def reset_device(config): 159 if getattr(config, "reboot_per_module", False): 160 for device in config.environment.devices: 161 device.reboot() 162 163 @staticmethod 164 def update_report_xml(result_xml, props): 165 """update devices, start time, end time, etc. to the result file""" 166 if not os.path.exists(result_xml) or not props: 167 return 168 try: 169 root = ElementTree.parse(result_xml).getroot() 170 except ElementTree.ParseError as e: 171 LOG.error(f"parse result xml error! xml file {result_xml}") 172 LOG.error(f"error message: {e}") 173 return 174 for k, v in props.items(): 175 if k == ReportConstant.devices: 176 v = literal_eval(str(v)) 177 root.set(k, v) 178 result_fd = os.open(result_xml, os.O_CREAT | os.O_WRONLY | os.O_TRUNC, FilePermission.mode_644) 179 with os.fdopen(result_fd, mode="w", encoding="utf-8") as result_file: 180 result_file.write(ElementTree.tostring(root).decode()) 181 182 def _handle_finally(self, driver, test, execute_message): 183 from xdevice import Scheduler 184 source_content = test.source.source_file or test.source.source_string 185 end_time = time.time() 186 LOG.info("Executed: %s, Execution Time: %s" % ( 187 source_content, calculate_elapsed_time(self.start_time, end_time))) 188 189 # inherit history report under retry mode 190 if driver and test: 191 execute_result = driver.__result__() 192 193 # update result xml 194 update_props = { 195 ReportConstant.start_time: time.strftime( 196 ReportConstant.time_format, time.localtime(int(self.start_time))), 197 ReportConstant.end_time: time.strftime( 198 ReportConstant.time_format, time.localtime(int(end_time))), 199 ReportConstant.test_type: test.source.test_type 200 } 201 if self.environment is not None: 202 update_props.update({ReportConstant.devices: self.environment.get_description()}) 203 self.update_report_xml(execute_result, update_props) 204 LOG.debug("Execute result: %s" % execute_result) 205 if getattr(self.task.config, "history_report_path", ""): 206 execute_result = self._inherit_execute_result( 207 execute_result, test) 208 execute_message.set_result(execute_result) 209 210 # set execute state 211 if self.error_message: 212 execute_message.set_state(ExecuteMessage.DEVICE_ERROR) 213 else: 214 execute_message.set_state(ExecuteMessage.DEVICE_FINISH) 215 216 # free environment 217 if self.environment: 218 LOG.debug("Thread %s free environment", 219 execute_message.get_thread_id()) 220 Scheduler.__free_environment__(execute_message.get_environment()) 221 222 LOG.debug("Put thread %s result", self.thread_id) 223 self.message_queue.put(execute_message) 224 LOG.info("") 225 226 def _do_common_module_kit_setup(self, driver_request): 227 for device in self.environment.devices: 228 setattr(device, ConfigConst.common_module_kits, []) 229 from xdevice import Scheduler 230 for kit in self.module_config_kits: 231 run_flag = False 232 for device in self.environment.devices: 233 if not Scheduler.is_execute: 234 raise ExecuteTerminate() 235 if not check_device_env_index(device, kit): 236 continue 237 if check_device_name(device, kit): 238 run_flag = True 239 kit_copy = copy.deepcopy(kit) 240 module_kits = getattr(device, ConfigConst.common_module_kits) 241 module_kits.append(kit_copy) 242 kit_copy.__setup__(device, request=driver_request) 243 if not run_flag: 244 kit_device_name = getattr(kit, "device_name", None) 245 error_msg = "device name '%s' of '%s' not exist" % ( 246 kit_device_name, kit.__class__.__name__) 247 LOG.error(error_msg, error_no="00108") 248 raise ParamError(error_msg, error_no="00108") 249 250 def _do_common_module_kit_teardown(self): 251 try: 252 for device in self.environment.devices: 253 for kit in getattr(device, ConfigConst.common_module_kits, []): 254 if check_device_name(device, kit, step="teardown"): 255 kit.__teardown__(device) 256 setattr(device, ConfigConst.common_module_kits, []) 257 except Exception as e: 258 LOG.error("Common module kit teardown error: {}".format(e)) 259 260 def _do_task_setup(self, driver_request): 261 if check_mode(ModeType.decc) or getattr( 262 driver_request.config, ConfigConst.check_device, False): 263 return 264 265 if self.environment is None: 266 return 267 268 if hasattr(driver_request.config, ConfigConst.module_config) and \ 269 getattr(driver_request.config, ConfigConst.module_config, None): 270 module_config_path = getattr(driver_request.config, ConfigConst.module_config, None) 271 LOG.debug("Common module config path: {}".format(module_config_path)) 272 from xdevice import Variables 273 config_path = get_file_absolute_path(module_config_path, 274 [os.path.join(Variables.exec_dir, "config")]) 275 json_config = JsonParser(config_path) 276 self.module_config_kits = get_kit_instances(json_config, 277 driver_request.config.resource_path, 278 driver_request.config.testcases_path) 279 self._do_common_module_kit_setup(driver_request) 280 281 from xdevice import Scheduler 282 for device in self.environment.devices: 283 if not getattr(device, ConfigConst.need_kit_setup, True): 284 LOG.debug("Device %s need kit setup is false" % device) 285 continue 286 287 # do task setup for device 288 kits_copy = copy.deepcopy(self.task.config.kits) 289 setattr(device, ConfigConst.task_kits, kits_copy) 290 for kit in getattr(device, ConfigConst.task_kits, []): 291 if not Scheduler.is_execute: 292 break 293 try: 294 kit.__setup__(device, request=driver_request) 295 except (ParamError, ExecuteTerminate, DeviceError, 296 LiteDeviceError, ValueError, TypeError, 297 SyntaxError, AttributeError) as exception: 298 error_no = getattr(exception, "error_no", "00000") 299 LOG.exception( 300 "Task setup device: %s, exception: %s" % ( 301 self.environment.__get_serial__(), 302 exception), exc_info=False, error_no=error_no) 303 LOG.debug("Set device %s need kit setup to false" % device) 304 setattr(device, ConfigConst.need_kit_setup, False) 305 306 # set product_info to self.task 307 if getattr(driver_request, ConfigConst.product_info, "") and not \ 308 getattr(self.task, ConfigConst.product_info, ""): 309 product_info = getattr(driver_request, ConfigConst.product_info) 310 if not isinstance(product_info, dict): 311 LOG.warning("Product info should be dict, %s", 312 product_info) 313 return 314 setattr(self.task, ConfigConst.product_info, product_info) 315 316 def _get_driver_request(self, root_desc, execute_message): 317 config = Config() 318 config.update(copy.deepcopy(self.task.config).__dict__) 319 config.environment = self.environment 320 if getattr(config, "history_report_path", ""): 321 # modify config.testargs 322 history_report_path = getattr(config, "history_report_path", "") 323 module_name = root_desc.source.module_name 324 unpassed_test_params = self._get_unpassed_test_params( 325 history_report_path, module_name) 326 if not unpassed_test_params: 327 LOG.info("%s all test cases are passed, no need retry", 328 module_name) 329 driver_request = Request(self.thread_id, root_desc, 330 self.listeners, config) 331 execute_message.set_request(driver_request) 332 return None 333 if unpassed_test_params[0] != module_name and \ 334 unpassed_test_params[0] != str(module_name).split(".")[0]: 335 test_args = getattr(config, "testargs", {}) 336 test_params = [] 337 for unpassed_test_param in unpassed_test_params: 338 if unpassed_test_param not in test_params: 339 test_params.append(unpassed_test_param) 340 test_args["test"] = test_params 341 if "class" in test_args.keys(): 342 test_args.pop("class") 343 setattr(config, "testargs", test_args) 344 if getattr(config, "tf_suite", ""): 345 if root_desc.source.module_name in config.tf_suite.keys(): 346 config.tf_suite = config.tf_suite.get( 347 root_desc.source.module_name) 348 else: 349 config.tf_suite = dict() 350 for listener in self.listeners: 351 LOG.debug("Thread id %s, listener %s" % (self.thread_id, listener)) 352 driver_request = Request(self.thread_id, root_desc, self.listeners, 353 config) 354 execute_message.set_request(driver_request) 355 return driver_request 356 357 @classmethod 358 def _get_unpassed_test_params(cls, history_report_path, module_name): 359 unpassed_test_params = [] 360 from _core.report.result_reporter import ResultReporter 361 params = ResultReporter.get_task_info_params(history_report_path) 362 if not params: 363 return unpassed_test_params 364 failed_list = [] 365 try: 366 from devicetest.agent.decc import Handler 367 if Handler.DAV.retry_select: 368 for i in Handler.DAV.case_id_list: 369 failed_list.append(i + "#" + i) 370 else: 371 failed_list = params[ReportConst.unsuccessful_params].get(module_name, []) 372 except Exception: 373 failed_list = params[ReportConst.unsuccessful_params].get(module_name, []) 374 if not failed_list: 375 failed_list = params[ReportConst.unsuccessful_params].get(str(module_name).split(".")[0], []) 376 unpassed_test_params.extend(failed_list) 377 LOG.debug("Get unpassed test params %s", unpassed_test_params) 378 return unpassed_test_params 379 380 @classmethod 381 def _append_unpassed_test_param(cls, history_report_file, 382 unpassed_test_params): 383 384 testsuites_element = DataHelper.parse_data_report(history_report_file) 385 for testsuite_element in testsuites_element: 386 suite_name = testsuite_element.get("name", "") 387 suite = Suite() 388 suite.set_cases(testsuite_element) 389 for case in suite.cases: 390 if case.is_passed(): 391 continue 392 unpassed_test_param = "{}#{}#{}".format( 393 suite_name, case.classname, case.name) 394 unpassed_test_params.append(unpassed_test_param) 395 396 def _inherit_execute_result(self, execute_result, root_desc): 397 module_name = root_desc.source.module_name 398 execute_result_name = "%s.xml" % module_name 399 history_execute_result = self._get_history_execute_result( 400 execute_result_name) 401 if not history_execute_result: 402 LOG.warning("%s no history execute result exists", 403 execute_result_name) 404 return execute_result 405 406 if not check_mode(ModeType.decc): 407 if not os.path.exists(execute_result): 408 result_dir = \ 409 os.path.join(self.task.config.report_path, "result") 410 os.makedirs(result_dir, exist_ok=True) 411 target_execute_result = os.path.join(result_dir, 412 execute_result_name) 413 shutil.copyfile(history_execute_result, target_execute_result) 414 LOG.info("Copy %s to %s" % (history_execute_result, 415 target_execute_result)) 416 return target_execute_result 417 418 real_execute_result = self._get_real_execute_result(execute_result) 419 420 # inherit history execute result 421 testsuites_element = DataHelper.parse_data_report(real_execute_result) 422 if self._is_empty_report(testsuites_element): 423 if check_mode(ModeType.decc): 424 LOG.info("Empty report no need to inherit history execute" 425 " result") 426 else: 427 LOG.info("Empty report '%s' no need to inherit history execute" 428 " result", history_execute_result) 429 return execute_result 430 431 real_history_execute_result = self._get_real_history_execute_result( 432 history_execute_result, module_name) 433 434 history_testsuites_element = DataHelper.parse_data_report( 435 real_history_execute_result) 436 if self._is_empty_report(history_testsuites_element): 437 LOG.info("History report '%s' is empty", history_execute_result) 438 return execute_result 439 if check_mode(ModeType.decc): 440 LOG.info("Inherit history execute result") 441 else: 442 LOG.info("Inherit history execute result: %s", 443 history_execute_result) 444 self._inherit_element(history_testsuites_element, testsuites_element) 445 446 if check_mode(ModeType.decc): 447 from xdevice import SuiteReporter 448 SuiteReporter.append_report_result( 449 (execute_result, DataHelper.to_string(testsuites_element))) 450 else: 451 # generate inherit execute result 452 DataHelper.generate_report(testsuites_element, execute_result) 453 return execute_result 454 455 def _inherit_element(self, history_testsuites_element, testsuites_element): 456 for history_testsuite_element in history_testsuites_element: 457 history_testsuite_name = history_testsuite_element.get("name", "") 458 target_testsuite_element = None 459 for testsuite_element in testsuites_element: 460 if history_testsuite_name == testsuite_element.get("name", ""): 461 target_testsuite_element = testsuite_element 462 break 463 464 if target_testsuite_element is None: 465 testsuites_element.append(history_testsuite_element) 466 inherited_test = int(testsuites_element.get( 467 ReportConstant.tests, 0)) + int( 468 history_testsuite_element.get(ReportConstant.tests, 0)) 469 testsuites_element.set(ReportConstant.tests, 470 str(inherited_test)) 471 continue 472 473 pass_num = 0 474 for history_testcase_element in history_testsuite_element: 475 if self._check_testcase_pass(history_testcase_element): 476 target_testsuite_element.append(history_testcase_element) 477 pass_num += 1 478 479 inherited_test = int(target_testsuite_element.get( 480 ReportConstant.tests, 0)) + pass_num 481 target_testsuite_element.set(ReportConstant.tests, 482 str(inherited_test)) 483 inherited_test = int(testsuites_element.get( 484 ReportConstant.tests, 0)) + pass_num 485 testsuites_element.set(ReportConstant.tests, str(inherited_test)) 486 487 def _get_history_execute_result(self, execute_result_name): 488 if execute_result_name.endswith(".xml"): 489 execute_result_name = execute_result_name[:-4] 490 history_execute_result = \ 491 self._get_data_report_from_record(execute_result_name) 492 if history_execute_result: 493 return history_execute_result 494 for root_dir, _, files in os.walk( 495 self.task.config.history_report_path): 496 for result_file in files: 497 if result_file.endswith(execute_result_name): 498 history_execute_result = os.path.abspath( 499 os.path.join(root_dir, result_file)) 500 return history_execute_result 501 502 @classmethod 503 def _check_testcase_pass(cls, history_testcase_element): 504 case = Case() 505 case.result = history_testcase_element.get(ReportConstant.result, "") 506 case.status = history_testcase_element.get(ReportConstant.status, "") 507 case.message = history_testcase_element.get(ReportConstant.message, "") 508 if len(history_testcase_element) > 0: 509 if not case.result: 510 case.result = ReportConstant.false 511 case.message = history_testcase_element[0].get( 512 ReportConstant.message) 513 514 return case.is_passed() 515 516 @classmethod 517 def _is_empty_report(cls, testsuites_element): 518 if len(testsuites_element) < 1: 519 return True 520 if len(testsuites_element) >= 2: 521 return False 522 523 if int(testsuites_element[0].get(ReportConstant.unavailable, 0)) > 0: 524 return True 525 return False 526 527 def _get_data_report_from_record(self, execute_result_name): 528 history_report_path = \ 529 getattr(self.task.config, "history_report_path", "") 530 if history_report_path: 531 from _core.report.result_reporter import ResultReporter 532 params = ResultReporter.get_task_info_params(history_report_path) 533 if params: 534 report_data_dict = dict(params[ReportConst.data_reports]) 535 if execute_result_name in report_data_dict.keys(): 536 return report_data_dict.get(execute_result_name) 537 elif execute_result_name.split(".")[0] in \ 538 report_data_dict.keys(): 539 return report_data_dict.get( 540 execute_result_name.split(".")[0]) 541 return "" 542 543 @classmethod 544 def _get_real_execute_result(cls, execute_result): 545 from xdevice import SuiteReporter 546 LOG.debug("Get real execute result length is: %s" % 547 len(SuiteReporter.get_report_result())) 548 if check_mode(ModeType.decc): 549 for suite_report, report_result in \ 550 SuiteReporter.get_report_result(): 551 if os.path.splitext(suite_report)[0] == \ 552 os.path.splitext(execute_result)[0]: 553 return report_result 554 return "" 555 else: 556 return execute_result 557 558 @classmethod 559 def _get_real_history_execute_result(cls, history_execute_result, 560 module_name): 561 from xdevice import SuiteReporter 562 LOG.debug("Get real history execute result: %s" % 563 SuiteReporter.history_report_result) 564 if check_mode(ModeType.decc): 565 virtual_report_path, report_result = SuiteReporter. \ 566 get_history_result_by_module(module_name) 567 return report_result 568 else: 569 return history_execute_result 570 571 572class DriversDryRunThread(threading.Thread): 573 def __init__(self, test_driver, task, environment, message_queue): 574 threading.Thread.__init__(self) 575 self.test_driver = test_driver 576 self.listeners = None 577 self.task = task 578 self.environment = environment 579 self.message_queue = message_queue 580 self.thread_id = None 581 self.error_message = "" 582 583 def set_thread_id(self, thread_id): 584 self.thread_id = thread_id 585 586 def run(self): 587 from xdevice import Scheduler 588 LOG.debug("Thread id: %s start" % self.thread_id) 589 start_time = time.time() 590 execute_message = ExecuteMessage('', self.environment, 591 self.test_driver, self.thread_id) 592 driver, test = None, None 593 try: 594 if self.test_driver and Scheduler.is_execute: 595 # construct params 596 driver, test = self.test_driver 597 driver_request = self._get_driver_request(test, 598 execute_message) 599 if driver_request is None: 600 return 601 602 # setup device 603 self._do_task_setup(driver_request) 604 605 # driver execute 606 self.reset_device(driver_request.config) 607 driver.__dry_run_execute__(driver_request) 608 609 except Exception as exception: 610 error_no = getattr(exception, "error_no", "00000") 611 if self.environment is None: 612 LOG.exception("Exception: %s", exception, exc_info=False, 613 error_no=error_no) 614 else: 615 LOG.exception( 616 "Device: %s, exception: %s" % ( 617 self.environment.__get_serial__(), exception), 618 exc_info=False, error_no=error_no) 619 self.error_message = "{}: {}".format( 620 get_instance_name(exception), str(exception)) 621 622 finally: 623 self._handle_finally(driver, execute_message, start_time, test) 624 625 @staticmethod 626 def reset_device(config): 627 if getattr(config, "reboot_per_module", False): 628 for device in config.environment.devices: 629 device.reboot() 630 631 def _handle_finally(self, driver, execute_message, start_time, test): 632 from xdevice import Scheduler 633 source_content = (self.test_driver[1].source.source_file 634 or self.test_driver[1].source.source_string) 635 LOG.info("Executed: %s, Execution Time: %s" % ( 636 source_content, calculate_elapsed_time(start_time, time.time()))) 637 638 # set execute state 639 if self.error_message: 640 execute_message.set_state(ExecuteMessage.DEVICE_ERROR) 641 else: 642 execute_message.set_state(ExecuteMessage.DEVICE_FINISH) 643 644 # free environment 645 if self.environment: 646 LOG.debug("Thread %s free environment", 647 execute_message.get_thread_id()) 648 Scheduler.__free_environment__(execute_message.get_environment()) 649 650 LOG.debug("Put thread %s result", self.thread_id) 651 self.message_queue.put(execute_message) 652 653 def _do_task_setup(self, driver_request): 654 if check_mode(ModeType.decc) or getattr( 655 driver_request.config, ConfigConst.check_device, False): 656 return 657 658 if self.environment is None: 659 return 660 661 from xdevice import Scheduler 662 for device in self.environment.devices: 663 if not getattr(device, ConfigConst.need_kit_setup, True): 664 LOG.debug("Device %s need kit setup is false" % device) 665 continue 666 667 # do task setup for device 668 kits_copy = copy.deepcopy(self.task.config.kits) 669 setattr(device, ConfigConst.task_kits, kits_copy) 670 for kit in getattr(device, ConfigConst.task_kits, []): 671 if not Scheduler.is_execute: 672 break 673 try: 674 kit.__setup__(device, request=driver_request) 675 except (ParamError, ExecuteTerminate, DeviceError, 676 LiteDeviceError, ValueError, TypeError, 677 SyntaxError, AttributeError) as exception: 678 error_no = getattr(exception, "error_no", "00000") 679 LOG.exception( 680 "Task setup device: %s, exception: %s" % ( 681 self.environment.__get_serial__(), 682 exception), exc_info=False, error_no=error_no) 683 LOG.debug("Set device %s need kit setup to false" % device) 684 setattr(device, ConfigConst.need_kit_setup, False) 685 686 # set product_info to self.task 687 if getattr(driver_request, ConfigConst.product_info, "") and not \ 688 getattr(self.task, ConfigConst.product_info, ""): 689 product_info = getattr(driver_request, ConfigConst.product_info) 690 if not isinstance(product_info, dict): 691 LOG.warning("Product info should be dict, %s", 692 product_info) 693 return 694 setattr(self.task, ConfigConst.product_info, product_info) 695 696 def _get_driver_request(self, root_desc, execute_message): 697 config = Config() 698 config.update(copy.deepcopy(self.task.config).__dict__) 699 config.environment = self.environment 700 if self.listeners: 701 for listener in self.listeners: 702 LOG.debug("Thread id %s, listener %s" % (self.thread_id, listener)) 703 driver_request = Request(self.thread_id, root_desc, self.listeners, 704 config) 705 execute_message.set_request(driver_request) 706 return driver_request 707 708 709class QueueMonitorThread(threading.Thread): 710 711 def __init__(self, message_queue, current_driver_threads, test_drivers): 712 threading.Thread.__init__(self) 713 self.message_queue = message_queue 714 self.current_driver_threads = current_driver_threads 715 self.test_drivers = test_drivers 716 717 def run(self): 718 from xdevice import Scheduler 719 LOG.debug("Queue monitor thread start") 720 while self.test_drivers or self.current_driver_threads: 721 if not self.current_driver_threads: 722 time.sleep(3) 723 continue 724 execute_message = self.message_queue.get() 725 726 self.current_driver_threads.pop(execute_message.get_thread_id()) 727 728 if execute_message.get_state() == ExecuteMessage.DEVICE_FINISH: 729 LOG.debug("Thread id: %s execute finished" % 730 execute_message.get_thread_id()) 731 elif execute_message.get_state() == ExecuteMessage.DEVICE_ERROR: 732 LOG.debug("Thread id: %s execute error" % 733 execute_message.get_thread_id()) 734 735 if Scheduler.upload_address: 736 Scheduler.upload_module_result(execute_message) 737 738 LOG.debug("Queue monitor thread end") 739 if not Scheduler.is_execute: 740 LOG.info("Terminate success") 741 Scheduler.terminate_result.put("terminate success") 742 743 744class ExecuteMessage: 745 DEVICE_RUN = 'device_run' 746 DEVICE_FINISH = 'device_finish' 747 DEVICE_ERROR = 'device_error' 748 749 def __init__(self, state, environment, drivers, thread_id): 750 self.state = state 751 self.environment = environment 752 self.drivers = drivers 753 self.thread_id = thread_id 754 self.request = None 755 self.result = None 756 757 def set_state(self, state): 758 self.state = state 759 760 def get_state(self): 761 return self.state 762 763 def set_request(self, request): 764 self.request = request 765 766 def get_request(self): 767 return self.request 768 769 def set_result(self, result): 770 self.result = result 771 772 def get_result(self): 773 return self.result 774 775 def get_environment(self): 776 return self.environment 777 778 def get_thread_id(self): 779 return self.thread_id 780 781 def get_drivers(self): 782 return self.drivers 783