1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import copy 20import os 21import shutil 22import threading 23import time 24from concurrent.futures import ThreadPoolExecutor 25from concurrent.futures import wait 26 27from _core.constants import ModeType 28from _core.constants import ConfigConst 29from _core.constants import ReportConst 30from _core.executor.request import Request 31from _core.logger import platform_logger 32from _core.plugin import Config 33from _core.utils import get_instance_name 34from _core.utils import check_mode 35from _core.exception import ParamError 36from _core.exception import ExecuteTerminate 37from _core.exception import DeviceError 38from _core.exception import LiteDeviceError 39from _core.report.reporter_helper import VisionHelper 40from _core.report.reporter_helper import ReportConstant 41from _core.report.reporter_helper import DataHelper 42from _core.report.reporter_helper import Suite 43from _core.report.reporter_helper import Case 44 45LOG = platform_logger("Concurrent") 46 47 48class Concurrent: 49 @classmethod 50 def executor_callback(cls, worker): 51 worker_exception = worker.exception() 52 if worker_exception: 53 LOG.error("Worker return exception: {}".format(worker_exception)) 54 55 @classmethod 56 def concurrent_execute(cls, func, params_list, max_size=8): 57 """ 58 Provider the ability to execute target function concurrently 59 :param func: target function name 60 :param params_list: the list of params in these target functions 61 :param max_size: the max size of thread you wanted in thread pool 62 :return: 63 """ 64 with ThreadPoolExecutor(max_size) as executor: 65 future_params = dict() 66 for params in params_list: 67 future = executor.submit(func, *params) 68 future_params.update({future: params}) 69 future.add_done_callback(cls.executor_callback) 70 wait(future_params) # wait all function complete 71 result_list = [] 72 for future in future_params: 73 result_list.append((future.result(), future_params[future])) 74 return result_list 75 76 77class DriversThread(threading.Thread): 78 def __init__(self, test_driver, task, environment, message_queue): 79 threading.Thread.__init__(self) 80 self.test_driver = test_driver 81 self.listeners = None 82 self.task = task 83 self.environment = environment 84 self.message_queue = message_queue 85 self.thread_id = None 86 self.error_message = "" 87 88 def set_listeners(self, listeners): 89 self.listeners = listeners 90 if self.environment is None: 91 return 92 93 for listener in listeners: 94 listener.device_sn = self.environment.devices[0].device_sn 95 96 def set_thread_id(self, thread_id): 97 self.thread_id = thread_id 98 99 def run(self): 100 from xdevice import Scheduler 101 LOG.debug("Thread id: %s start" % self.thread_id) 102 start_time = time.time() 103 execute_message = ExecuteMessage('', self.environment, 104 self.test_driver, self.thread_id) 105 driver, test = None, None 106 try: 107 if self.test_driver and Scheduler.is_execute: 108 # construct params 109 driver, test = self.test_driver 110 driver_request = self._get_driver_request(test, 111 execute_message) 112 if driver_request is None: 113 return 114 115 # setup device 116 self._do_task_setup(driver_request) 117 118 # driver execute 119 self.reset_device(driver_request.config) 120 driver.__execute__(driver_request) 121 122 except Exception as exception: 123 error_no = getattr(exception, "error_no", "00000") 124 if self.environment is None: 125 LOG.exception("Exception: %s", exception, exc_info=False, 126 error_no=error_no) 127 else: 128 LOG.exception( 129 "Device: %s, exception: %s" % ( 130 self.environment.__get_serial__(), exception), 131 exc_info=False, error_no=error_no) 132 self.error_message = "{}: {}".format( 133 get_instance_name(exception), str(exception)) 134 135 finally: 136 self._handle_finally(driver, execute_message, start_time, test) 137 138 @staticmethod 139 def reset_device(config): 140 if getattr(config, "reboot_per_module", False): 141 for device in config.environment.devices: 142 device.reboot() 143 144 def _handle_finally(self, driver, execute_message, start_time, test): 145 from xdevice import Scheduler 146 # output execute time 147 end_time = time.time() 148 execute_time = VisionHelper.get_execute_time(int( 149 end_time - start_time)) 150 source_content = self.test_driver[1].source.source_file or \ 151 self.test_driver[1].source.source_string 152 LOG.info("Executed: %s, Execution Time: %s" % ( 153 source_content, execute_time)) 154 155 # inherit history report under retry mode 156 if driver and test: 157 execute_result = driver.__result__() 158 LOG.debug("Execute result:%s" % execute_result) 159 if getattr(self.task.config, "history_report_path", ""): 160 execute_result = self._inherit_execute_result( 161 execute_result, test) 162 execute_message.set_result(execute_result) 163 164 # set execute state 165 if self.error_message: 166 execute_message.set_state(ExecuteMessage.DEVICE_ERROR) 167 else: 168 execute_message.set_state(ExecuteMessage.DEVICE_FINISH) 169 170 # free environment 171 if self.environment: 172 LOG.debug("Thread %s free environment", 173 execute_message.get_thread_id()) 174 Scheduler.__free_environment__(execute_message.get_environment()) 175 176 LOG.debug("Put thread %s result", self.thread_id) 177 self.message_queue.put(execute_message) 178 LOG.info("") 179 180 def _do_task_setup(self, driver_request): 181 if check_mode(ModeType.decc) or getattr( 182 driver_request.config, ConfigConst.check_device, False): 183 return 184 185 if self.environment is None: 186 return 187 188 from xdevice import Scheduler 189 for device in self.environment.devices: 190 if not getattr(device, ConfigConst.need_kit_setup, True): 191 LOG.debug("Device %s need kit setup is false" % device) 192 continue 193 194 # do task setup for device 195 kits_copy = copy.deepcopy(self.task.config.kits) 196 setattr(device, ConfigConst.task_kits, kits_copy) 197 for kit in getattr(device, ConfigConst.task_kits, []): 198 if not Scheduler.is_execute: 199 break 200 try: 201 kit.__setup__(device, request=driver_request) 202 except (ParamError, ExecuteTerminate, DeviceError, 203 LiteDeviceError, ValueError, TypeError, 204 SyntaxError, AttributeError) as exception: 205 error_no = getattr(exception, "error_no", "00000") 206 LOG.exception( 207 "Task setup device: %s, exception: %s" % ( 208 self.environment.__get_serial__(), 209 exception), exc_info=False, error_no=error_no) 210 LOG.debug("Set device %s need kit setup to false" % device) 211 setattr(device, ConfigConst.need_kit_setup, False) 212 213 # set product_info to self.task 214 if getattr(driver_request, ConfigConst.product_info, "") and not \ 215 getattr(self.task, ConfigConst.product_info, ""): 216 product_info = getattr(driver_request, ConfigConst.product_info) 217 if not isinstance(product_info, dict): 218 LOG.warning("Product info should be dict, %s", 219 product_info) 220 return 221 setattr(self.task, ConfigConst.product_info, product_info) 222 223 def _get_driver_request(self, root_desc, execute_message): 224 config = Config() 225 config.update(copy.deepcopy(self.task.config).__dict__) 226 config.environment = self.environment 227 if getattr(config, "history_report_path", ""): 228 # modify config.testargs 229 history_report_path = getattr(config, "history_report_path", "") 230 module_name = root_desc.source.module_name 231 unpassed_test_params = self._get_unpassed_test_params( 232 history_report_path, module_name) 233 if not unpassed_test_params: 234 LOG.info("%s all test cases are passed, no need retry", 235 module_name) 236 driver_request = Request(self.thread_id, root_desc, 237 self.listeners, config) 238 execute_message.set_request(driver_request) 239 return None 240 if unpassed_test_params[0] != module_name and \ 241 unpassed_test_params[0] != str(module_name).split(".")[0]: 242 test_args = getattr(config, "testargs", {}) 243 test_params = [] 244 for unpassed_test_param in unpassed_test_params: 245 if unpassed_test_param not in test_params: 246 test_params.append(unpassed_test_param) 247 test_args["test"] = test_params 248 if "class" in test_args.keys(): 249 test_args.pop("class") 250 setattr(config, "testargs", test_args) 251 if getattr(config, "tf_suite", ""): 252 if root_desc.source.module_name in config.tf_suite.keys(): 253 config.tf_suite = config.tf_suite.get( 254 root_desc.source.module_name) 255 else: 256 config.tf_suite = dict() 257 for listener in self.listeners: 258 LOG.debug("Thread id %s, listener %s" % (self.thread_id, listener)) 259 driver_request = Request(self.thread_id, root_desc, self.listeners, 260 config) 261 execute_message.set_request(driver_request) 262 return driver_request 263 264 @classmethod 265 def _get_unpassed_test_params(cls, history_report_path, module_name): 266 unpassed_test_params = [] 267 from _core.report.result_reporter import ResultReporter 268 params = ResultReporter.get_task_info_params(history_report_path) 269 if not params: 270 return unpassed_test_params 271 failed_list = [] 272 try: 273 from devicetest.agent.decc import Handler 274 if Handler.DAV.retry_select: 275 for i in Handler.DAV.case_id_list: 276 failed_list.append(i + "#" + i) 277 else: 278 failed_list = params[ReportConst.unsuccessful_params].get(module_name, []) 279 except Exception: 280 failed_list = params[ReportConst.unsuccessful_params].get(module_name, []) 281 if not failed_list: 282 failed_list = params[ReportConst.unsuccessful_params].get(str(module_name).split(".")[0], []) 283 unpassed_test_params.extend(failed_list) 284 LOG.debug("Get unpassed test params %s", unpassed_test_params) 285 return unpassed_test_params 286 287 @classmethod 288 def _append_unpassed_test_param(cls, history_report_file, 289 unpassed_test_params): 290 291 testsuites_element = DataHelper.parse_data_report(history_report_file) 292 for testsuite_element in testsuites_element: 293 suite_name = testsuite_element.get("name", "") 294 suite = Suite() 295 suite.set_cases(testsuite_element) 296 for case in suite.cases: 297 if case.is_passed(): 298 continue 299 unpassed_test_param = "{}#{}#{}".format( 300 suite_name, case.classname, case.name) 301 unpassed_test_params.append(unpassed_test_param) 302 303 def _inherit_execute_result(self, execute_result, root_desc): 304 module_name = root_desc.source.module_name 305 execute_result_name = "%s.xml" % module_name 306 history_execute_result = self._get_history_execute_result( 307 execute_result_name) 308 if not history_execute_result: 309 LOG.warning("%s no history execute result exists", 310 execute_result_name) 311 return execute_result 312 313 if not check_mode(ModeType.decc): 314 if not os.path.exists(execute_result): 315 result_dir = \ 316 os.path.join(self.task.config.report_path, "result") 317 os.makedirs(result_dir, exist_ok=True) 318 target_execute_result = os.path.join(result_dir, 319 execute_result_name) 320 shutil.copyfile(history_execute_result, target_execute_result) 321 LOG.info("Copy %s to %s" % (history_execute_result, 322 target_execute_result)) 323 return target_execute_result 324 325 real_execute_result = self._get_real_execute_result(execute_result) 326 327 # inherit history execute result 328 testsuites_element = DataHelper.parse_data_report(real_execute_result) 329 if self._is_empty_report(testsuites_element): 330 if check_mode(ModeType.decc): 331 LOG.info("Empty report no need to inherit history execute" 332 " result") 333 else: 334 LOG.info("Empty report '%s' no need to inherit history execute" 335 " result", history_execute_result) 336 return execute_result 337 338 real_history_execute_result = self._get_real_history_execute_result( 339 history_execute_result, module_name) 340 341 history_testsuites_element = DataHelper.parse_data_report( 342 real_history_execute_result) 343 if self._is_empty_report(history_testsuites_element): 344 LOG.info("History report '%s' is empty", history_execute_result) 345 return execute_result 346 if check_mode(ModeType.decc): 347 LOG.info("Inherit history execute result") 348 else: 349 LOG.info("Inherit history execute result: %s", 350 history_execute_result) 351 self._inherit_element(history_testsuites_element, testsuites_element) 352 353 if check_mode(ModeType.decc): 354 from xdevice import SuiteReporter 355 SuiteReporter.append_report_result( 356 (execute_result, DataHelper.to_string(testsuites_element))) 357 else: 358 # generate inherit execute result 359 DataHelper.generate_report(testsuites_element, execute_result) 360 return execute_result 361 362 def _inherit_element(self, history_testsuites_element, testsuites_element): 363 for history_testsuite_element in history_testsuites_element: 364 history_testsuite_name = history_testsuite_element.get("name", "") 365 target_testsuite_element = None 366 for testsuite_element in testsuites_element: 367 if history_testsuite_name == testsuite_element.get("name", ""): 368 target_testsuite_element = testsuite_element 369 break 370 371 if target_testsuite_element is None: 372 testsuites_element.append(history_testsuite_element) 373 inherited_test = int(testsuites_element.get( 374 ReportConstant.tests, 0)) + int( 375 history_testsuite_element.get(ReportConstant.tests, 0)) 376 testsuites_element.set(ReportConstant.tests, 377 str(inherited_test)) 378 continue 379 380 pass_num = 0 381 for history_testcase_element in history_testsuite_element: 382 if self._check_testcase_pass(history_testcase_element): 383 target_testsuite_element.append(history_testcase_element) 384 pass_num += 1 385 386 inherited_test = int(target_testsuite_element.get( 387 ReportConstant.tests, 0)) + pass_num 388 target_testsuite_element.set(ReportConstant.tests, 389 str(inherited_test)) 390 inherited_test = int(testsuites_element.get( 391 ReportConstant.tests, 0)) + pass_num 392 testsuites_element.set(ReportConstant.tests, str(inherited_test)) 393 394 def _get_history_execute_result(self, execute_result_name): 395 if execute_result_name.endswith(".xml"): 396 execute_result_name = execute_result_name[:-4] 397 history_execute_result = \ 398 self._get_data_report_from_record(execute_result_name) 399 if history_execute_result: 400 return history_execute_result 401 for root_dir, _, files in os.walk( 402 self.task.config.history_report_path): 403 for result_file in files: 404 if result_file.endswith(execute_result_name): 405 history_execute_result = os.path.abspath( 406 os.path.join(root_dir, result_file)) 407 return history_execute_result 408 409 @classmethod 410 def _check_testcase_pass(cls, history_testcase_element): 411 case = Case() 412 case.result = history_testcase_element.get(ReportConstant.result, "") 413 case.status = history_testcase_element.get(ReportConstant.status, "") 414 case.message = history_testcase_element.get(ReportConstant.message, "") 415 if len(history_testcase_element) > 0: 416 if not case.result: 417 case.result = ReportConstant.false 418 case.message = history_testcase_element[0].get( 419 ReportConstant.message) 420 421 return case.is_passed() 422 423 @classmethod 424 def _is_empty_report(cls, testsuites_element): 425 if len(testsuites_element) < 1: 426 return True 427 if len(testsuites_element) >= 2: 428 return False 429 430 if int(testsuites_element[0].get(ReportConstant.unavailable, 0)) > 0: 431 return True 432 return False 433 434 def _get_data_report_from_record(self, execute_result_name): 435 history_report_path = \ 436 getattr(self.task.config, "history_report_path", "") 437 if history_report_path: 438 from _core.report.result_reporter import ResultReporter 439 params = ResultReporter.get_task_info_params(history_report_path) 440 if params: 441 report_data_dict = dict(params[ReportConst.data_reports]) 442 if execute_result_name in report_data_dict.keys(): 443 return report_data_dict.get(execute_result_name) 444 elif execute_result_name.split(".")[0] in \ 445 report_data_dict.keys(): 446 return report_data_dict.get( 447 execute_result_name.split(".")[0]) 448 return "" 449 450 @classmethod 451 def _get_real_execute_result(cls, execute_result): 452 from xdevice import SuiteReporter 453 LOG.debug("Get real execute result length is: %s" % 454 len(SuiteReporter.get_report_result())) 455 if check_mode(ModeType.decc): 456 for suite_report, report_result in \ 457 SuiteReporter.get_report_result(): 458 if os.path.splitext(suite_report)[0] == \ 459 os.path.splitext(execute_result)[0]: 460 return report_result 461 return "" 462 else: 463 return execute_result 464 465 @classmethod 466 def _get_real_history_execute_result(cls, history_execute_result, 467 module_name): 468 from xdevice import SuiteReporter 469 LOG.debug("Get real history execute result: %s" % 470 SuiteReporter.history_report_result) 471 if check_mode(ModeType.decc): 472 virtual_report_path, report_result = SuiteReporter. \ 473 get_history_result_by_module(module_name) 474 return report_result 475 else: 476 return history_execute_result 477 478 479class DriversDryRunThread(threading.Thread): 480 def __init__(self, test_driver, task, environment, message_queue): 481 threading.Thread.__init__(self) 482 self.test_driver = test_driver 483 self.listeners = None 484 self.task = task 485 self.environment = environment 486 self.message_queue = message_queue 487 self.thread_id = None 488 self.error_message = "" 489 490 def set_thread_id(self, thread_id): 491 self.thread_id = thread_id 492 493 def run(self): 494 from xdevice import Scheduler 495 LOG.debug("Thread id: %s start" % self.thread_id) 496 start_time = time.time() 497 execute_message = ExecuteMessage('', self.environment, 498 self.test_driver, self.thread_id) 499 driver, test = None, None 500 try: 501 if self.test_driver and Scheduler.is_execute: 502 # construct params 503 driver, test = self.test_driver 504 driver_request = self._get_driver_request(test, 505 execute_message) 506 if driver_request is None: 507 return 508 509 # setup device 510 self._do_task_setup(driver_request) 511 512 # driver execute 513 self.reset_device(driver_request.config) 514 driver.__dry_run_execute__(driver_request) 515 516 except Exception as exception: 517 error_no = getattr(exception, "error_no", "00000") 518 if self.environment is None: 519 LOG.exception("Exception: %s", exception, exc_info=False, 520 error_no=error_no) 521 else: 522 LOG.exception( 523 "Device: %s, exception: %s" % ( 524 self.environment.__get_serial__(), exception), 525 exc_info=False, error_no=error_no) 526 self.error_message = "{}: {}".format( 527 get_instance_name(exception), str(exception)) 528 529 finally: 530 self._handle_finally(driver, execute_message, start_time, test) 531 532 @staticmethod 533 def reset_device(config): 534 if getattr(config, "reboot_per_module", False): 535 for device in config.environment.devices: 536 device.reboot() 537 538 def _handle_finally(self, driver, execute_message, start_time, test): 539 from xdevice import Scheduler 540 # output execute time 541 end_time = time.time() 542 execute_time = VisionHelper.get_execute_time(int( 543 end_time - start_time)) 544 source_content = self.test_driver[1].source.source_file or \ 545 self.test_driver[1].source.source_string 546 LOG.info("Executed: %s, Execution Time: %s" % ( 547 source_content, execute_time)) 548 549 # set execute state 550 if self.error_message: 551 execute_message.set_state(ExecuteMessage.DEVICE_ERROR) 552 else: 553 execute_message.set_state(ExecuteMessage.DEVICE_FINISH) 554 555 # free environment 556 if self.environment: 557 LOG.debug("Thread %s free environment", 558 execute_message.get_thread_id()) 559 Scheduler.__free_environment__(execute_message.get_environment()) 560 561 LOG.debug("Put thread %s result", self.thread_id) 562 self.message_queue.put(execute_message) 563 564 def _do_task_setup(self, driver_request): 565 if check_mode(ModeType.decc) or getattr( 566 driver_request.config, ConfigConst.check_device, False): 567 return 568 569 if self.environment is None: 570 return 571 572 from xdevice import Scheduler 573 for device in self.environment.devices: 574 if not getattr(device, ConfigConst.need_kit_setup, True): 575 LOG.debug("Device %s need kit setup is false" % device) 576 continue 577 578 # do task setup for device 579 kits_copy = copy.deepcopy(self.task.config.kits) 580 setattr(device, ConfigConst.task_kits, kits_copy) 581 for kit in getattr(device, ConfigConst.task_kits, []): 582 if not Scheduler.is_execute: 583 break 584 try: 585 kit.__setup__(device, request=driver_request) 586 except (ParamError, ExecuteTerminate, DeviceError, 587 LiteDeviceError, ValueError, TypeError, 588 SyntaxError, AttributeError) as exception: 589 error_no = getattr(exception, "error_no", "00000") 590 LOG.exception( 591 "Task setup device: %s, exception: %s" % ( 592 self.environment.__get_serial__(), 593 exception), exc_info=False, error_no=error_no) 594 LOG.debug("Set device %s need kit setup to false" % device) 595 setattr(device, ConfigConst.need_kit_setup, False) 596 597 # set product_info to self.task 598 if getattr(driver_request, ConfigConst.product_info, "") and not \ 599 getattr(self.task, ConfigConst.product_info, ""): 600 product_info = getattr(driver_request, ConfigConst.product_info) 601 if not isinstance(product_info, dict): 602 LOG.warning("Product info should be dict, %s", 603 product_info) 604 return 605 setattr(self.task, ConfigConst.product_info, product_info) 606 607 def _get_driver_request(self, root_desc, execute_message): 608 config = Config() 609 config.update(copy.deepcopy(self.task.config).__dict__) 610 config.environment = self.environment 611 if self.listeners: 612 for listener in self.listeners: 613 LOG.debug("Thread id %s, listener %s" % (self.thread_id, listener)) 614 driver_request = Request(self.thread_id, root_desc, self.listeners, 615 config) 616 execute_message.set_request(driver_request) 617 return driver_request 618 619 620class QueueMonitorThread(threading.Thread): 621 622 def __init__(self, message_queue, current_driver_threads, test_drivers): 623 threading.Thread.__init__(self) 624 self.message_queue = message_queue 625 self.current_driver_threads = current_driver_threads 626 self.test_drivers = test_drivers 627 628 def run(self): 629 from xdevice import Scheduler 630 LOG.debug("Queue monitor thread start") 631 while self.test_drivers or self.current_driver_threads: 632 if not self.current_driver_threads: 633 time.sleep(3) 634 continue 635 execute_message = self.message_queue.get() 636 637 self.current_driver_threads.pop(execute_message.get_thread_id()) 638 639 if execute_message.get_state() == ExecuteMessage.DEVICE_FINISH: 640 LOG.debug("Thread id: %s execute finished" % 641 execute_message.get_thread_id()) 642 elif execute_message.get_state() == ExecuteMessage.DEVICE_ERROR: 643 LOG.debug("Thread id: %s execute error" % 644 execute_message.get_thread_id()) 645 646 if Scheduler.upload_address: 647 Scheduler.upload_module_result(execute_message) 648 649 LOG.debug("Queue monitor thread end") 650 if not Scheduler.is_execute: 651 LOG.info("Terminate success") 652 Scheduler.terminate_result.put("terminate success") 653 654 655class ExecuteMessage: 656 DEVICE_RUN = 'device_run' 657 DEVICE_FINISH = 'device_finish' 658 DEVICE_ERROR = 'device_error' 659 660 def __init__(self, state, environment, drivers, thread_id): 661 self.state = state 662 self.environment = environment 663 self.drivers = drivers 664 self.thread_id = thread_id 665 self.request = None 666 self.result = None 667 668 def set_state(self, state): 669 self.state = state 670 671 def get_state(self): 672 return self.state 673 674 def set_request(self, request): 675 self.request = request 676 677 def get_request(self): 678 return self.request 679 680 def set_result(self, result): 681 self.result = result 682 683 def get_result(self): 684 return self.result 685 686 def get_environment(self): 687 return self.environment 688 689 def get_thread_id(self): 690 return self.thread_id 691 692 def get_drivers(self): 693 return self.drivers 694