1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import copy 20import os 21import shutil 22import threading 23import time 24from concurrent.futures import ThreadPoolExecutor 25from concurrent.futures import wait 26 27from _core.constants import ModeType 28from _core.constants import ConfigConst 29from _core.constants import ReportConst 30from _core.executor.request import Request 31from _core.logger import platform_logger 32from _core.plugin import Config 33from _core.utils import get_instance_name 34from _core.utils import check_mode 35from _core.exception import ParamError 36from _core.exception import ExecuteTerminate 37from _core.exception import DeviceError 38from _core.exception import LiteDeviceError 39from _core.report.reporter_helper import VisionHelper 40from _core.report.reporter_helper import ReportConstant 41from _core.report.reporter_helper import DataHelper 42from _core.report.reporter_helper import Suite 43from _core.report.reporter_helper import Case 44 45LOG = platform_logger("Concurrent") 46 47 48class Concurrent: 49 @classmethod 50 def executor_callback(cls, worker): 51 worker_exception = worker.exception() 52 if worker_exception: 53 LOG.error("Worker return exception: {}".format(worker_exception)) 54 55 @classmethod 56 def concurrent_execute(cls, func, params_list, max_size=8): 57 """ 58 Provider the ability to execute target function concurrently 59 :param func: target function name 60 :param params_list: the list of params in these target functions 61 :param max_size: the max size of thread you wanted in thread pool 62 :return: 63 """ 64 with ThreadPoolExecutor(max_size) as executor: 65 future_params = dict() 66 for params in params_list: 67 future = executor.submit(func, *params) 68 future_params.update({future: params}) 69 future.add_done_callback(cls.executor_callback) 70 wait(future_params) # wait all function complete 71 result_list = [] 72 for future in future_params: 73 result_list.append((future.result(), future_params[future])) 74 return result_list 75 76 77class DriversThread(threading.Thread): 78 def __init__(self, test_driver, task, environment, message_queue): 79 threading.Thread.__init__(self) 80 self.test_driver = test_driver 81 self.listeners = None 82 self.task = task 83 self.environment = environment 84 self.message_queue = message_queue 85 self.thread_id = None 86 self.error_message = "" 87 88 def set_listeners(self, listeners): 89 self.listeners = listeners 90 if self.environment is None: 91 return 92 93 for listener in listeners: 94 listener.device_sn = self.environment.devices[0].device_sn 95 96 def set_thread_id(self, thread_id): 97 self.thread_id = thread_id 98 99 def run(self): 100 from xdevice import Scheduler 101 LOG.debug("Thread id: %s start" % self.thread_id) 102 start_time = time.time() 103 execute_message = ExecuteMessage('', self.environment, 104 self.test_driver, self.thread_id) 105 driver, test = None, None 106 try: 107 if self.test_driver and Scheduler.is_execute: 108 # construct params 109 driver, test = self.test_driver 110 driver_request = self._get_driver_request(test, 111 execute_message) 112 if driver_request is None: 113 return 114 115 # setup device 116 self._do_task_setup(driver_request) 117 118 # driver execute 119 self.reset_device(driver_request.config) 120 driver.__execute__(driver_request) 121 122 except Exception as exception: 123 error_no = getattr(exception, "error_no", "00000") 124 if self.environment is None: 125 LOG.exception("Exception: %s", exception, exc_info=False, 126 error_no=error_no) 127 else: 128 LOG.exception( 129 "Device: %s, exception: %s" % ( 130 self.environment.__get_serial__(), exception), 131 exc_info=False, error_no=error_no) 132 self.error_message = "{}: {}".format( 133 get_instance_name(exception), str(exception)) 134 135 finally: 136 self._handle_finally(driver, execute_message, start_time, test) 137 138 @staticmethod 139 def reset_device(config): 140 if getattr(config, "reboot_per_module", False): 141 for device in config.environment.devices: 142 device.reboot() 143 144 def _handle_finally(self, driver, execute_message, start_time, test): 145 from xdevice import Scheduler 146 # output execute time 147 end_time = time.time() 148 execute_time = VisionHelper.get_execute_time(int( 149 end_time - start_time)) 150 source_content = self.test_driver[1].source.source_file or \ 151 self.test_driver[1].source.source_string 152 LOG.info("Executed: %s, Execution Time: %s" % ( 153 source_content, execute_time)) 154 155 # inherit history report under retry mode 156 if driver and test: 157 execute_result = driver.__result__() 158 LOG.debug("Execute result:%s" % execute_result) 159 if getattr(self.task.config, "history_report_path", ""): 160 execute_result = self._inherit_execute_result( 161 execute_result, test) 162 execute_message.set_result(execute_result) 163 164 # set execute state 165 if self.error_message: 166 execute_message.set_state(ExecuteMessage.DEVICE_ERROR) 167 else: 168 execute_message.set_state(ExecuteMessage.DEVICE_FINISH) 169 170 # free environment 171 if self.environment: 172 LOG.debug("Thread %s free environment", 173 execute_message.get_thread_id()) 174 Scheduler.__free_environment__(execute_message.get_environment()) 175 176 LOG.debug("Put thread %s result", self.thread_id) 177 self.message_queue.put(execute_message) 178 LOG.info("") 179 180 def _do_task_setup(self, driver_request): 181 if check_mode(ModeType.decc) or getattr( 182 driver_request.config, ConfigConst.check_device, False): 183 return 184 185 if self.environment is None: 186 return 187 188 from xdevice import Scheduler 189 for device in self.environment.devices: 190 if not getattr(device, ConfigConst.need_kit_setup, True): 191 LOG.debug("Device %s need kit setup is false" % device) 192 continue 193 194 # do task setup for device 195 kits_copy = copy.deepcopy(self.task.config.kits) 196 setattr(device, ConfigConst.task_kits, kits_copy) 197 for kit in getattr(device, ConfigConst.task_kits, []): 198 if not Scheduler.is_execute: 199 break 200 try: 201 kit.__setup__(device, request=driver_request) 202 except (ParamError, ExecuteTerminate, DeviceError, 203 LiteDeviceError, ValueError, TypeError, 204 SyntaxError, AttributeError) as exception: 205 error_no = getattr(exception, "error_no", "00000") 206 LOG.exception( 207 "Task setup device: %s, exception: %s" % ( 208 self.environment.__get_serial__(), 209 exception), exc_info=False, error_no=error_no) 210 LOG.debug("Set device %s need kit setup to false" % device) 211 setattr(device, ConfigConst.need_kit_setup, False) 212 213 # set product_info to self.task 214 if getattr(driver_request, ConfigConst.product_info, "") and not \ 215 getattr(self.task, ConfigConst.product_info, ""): 216 product_info = getattr(driver_request, ConfigConst.product_info) 217 if not isinstance(product_info, dict): 218 LOG.warning("Product info should be dict, %s", 219 product_info) 220 return 221 setattr(self.task, ConfigConst.product_info, product_info) 222 223 def _get_driver_request(self, root_desc, execute_message): 224 config = Config() 225 config.update(copy.deepcopy(self.task.config).__dict__) 226 config.environment = self.environment 227 if getattr(config, "history_report_path", ""): 228 # modify config.testargs 229 history_report_path = getattr(config, "history_report_path", "") 230 module_name = root_desc.source.module_name 231 unpassed_test_params = self._get_unpassed_test_params( 232 history_report_path, module_name) 233 if not unpassed_test_params: 234 LOG.info("%s all test cases are passed, no need retry", 235 module_name) 236 driver_request = Request(self.thread_id, root_desc, 237 self.listeners, config) 238 execute_message.set_request(driver_request) 239 return None 240 if unpassed_test_params[0] != module_name and \ 241 unpassed_test_params[0] != str(module_name).split(".")[0]: 242 test_args = getattr(config, "testargs", {}) 243 test_params = [] 244 for unpassed_test_param in unpassed_test_params: 245 if unpassed_test_param not in test_params: 246 test_params.append(unpassed_test_param) 247 test_args["test"] = test_params 248 if "class" in test_args.keys(): 249 test_args.pop("class") 250 setattr(config, "testargs", test_args) 251 252 for listener in self.listeners: 253 LOG.debug("Thread id %s, listener %s" % (self.thread_id, listener)) 254 driver_request = Request(self.thread_id, root_desc, self.listeners, 255 config) 256 execute_message.set_request(driver_request) 257 return driver_request 258 259 @classmethod 260 def _get_unpassed_test_params(cls, history_report_path, module_name): 261 unpassed_test_params = [] 262 from _core.report.result_reporter import ResultReporter 263 params = ResultReporter.get_task_info_params(history_report_path) 264 if not params: 265 return unpassed_test_params 266 failed_list = [] 267 try: 268 from devicetest.agent.decc import Handler 269 if Handler.DAV.retry_select: 270 for i in Handler.DAV.case_id_list: 271 failed_list.append(i + "#" + i) 272 else: 273 failed_list = params[ReportConst.unsuccessful_params].get(module_name, []) 274 except Exception: 275 failed_list = params[ReportConst.unsuccessful_params].get(module_name, []) 276 if not failed_list: 277 failed_list = params[ReportConst.unsuccessful_params].get(str(module_name).split(".")[0], []) 278 unpassed_test_params.extend(failed_list) 279 LOG.debug("Get unpassed test params %s", unpassed_test_params) 280 return unpassed_test_params 281 282 @classmethod 283 def _append_unpassed_test_param(cls, history_report_file, 284 unpassed_test_params): 285 286 testsuites_element = DataHelper.parse_data_report(history_report_file) 287 for testsuite_element in testsuites_element: 288 suite_name = testsuite_element.get("name", "") 289 suite = Suite() 290 suite.set_cases(testsuite_element) 291 for case in suite.cases: 292 if case.is_passed(): 293 continue 294 unpassed_test_param = "{}#{}#{}".format( 295 suite_name, case.classname, case.name) 296 unpassed_test_params.append(unpassed_test_param) 297 298 def _inherit_execute_result(self, execute_result, root_desc): 299 module_name = root_desc.source.module_name 300 execute_result_name = "%s.xml" % module_name 301 history_execute_result = self._get_history_execute_result( 302 execute_result_name) 303 if not history_execute_result: 304 LOG.warning("%s no history execute result exists", 305 execute_result_name) 306 return execute_result 307 308 if not check_mode(ModeType.decc): 309 if not os.path.exists(execute_result): 310 result_dir = \ 311 os.path.join(self.task.config.report_path, "result") 312 os.makedirs(result_dir, exist_ok=True) 313 target_execute_result = os.path.join(result_dir, 314 execute_result_name) 315 shutil.copyfile(history_execute_result, target_execute_result) 316 LOG.info("Copy %s to %s" % (history_execute_result, 317 target_execute_result)) 318 return target_execute_result 319 320 real_execute_result = self._get_real_execute_result(execute_result) 321 322 # inherit history execute result 323 testsuites_element = DataHelper.parse_data_report(real_execute_result) 324 if self._is_empty_report(testsuites_element): 325 if check_mode(ModeType.decc): 326 LOG.info("Empty report no need to inherit history execute" 327 " result") 328 else: 329 LOG.info("Empty report '%s' no need to inherit history execute" 330 " result", history_execute_result) 331 return execute_result 332 333 real_history_execute_result = self._get_real_history_execute_result( 334 history_execute_result, module_name) 335 336 history_testsuites_element = DataHelper.parse_data_report( 337 real_history_execute_result) 338 if self._is_empty_report(history_testsuites_element): 339 LOG.info("History report '%s' is empty", history_execute_result) 340 return execute_result 341 if check_mode(ModeType.decc): 342 LOG.info("Inherit history execute result") 343 else: 344 LOG.info("Inherit history execute result: %s", 345 history_execute_result) 346 self._inherit_element(history_testsuites_element, testsuites_element) 347 348 if check_mode(ModeType.decc): 349 from xdevice import SuiteReporter 350 SuiteReporter.append_report_result( 351 (execute_result, DataHelper.to_string(testsuites_element))) 352 else: 353 # generate inherit execute result 354 DataHelper.generate_report(testsuites_element, execute_result) 355 return execute_result 356 357 def _inherit_element(self, history_testsuites_element, testsuites_element): 358 for history_testsuite_element in history_testsuites_element: 359 history_testsuite_name = history_testsuite_element.get("name", "") 360 target_testsuite_element = None 361 for testsuite_element in testsuites_element: 362 if history_testsuite_name == testsuite_element.get("name", ""): 363 target_testsuite_element = testsuite_element 364 break 365 366 if target_testsuite_element is None: 367 testsuites_element.append(history_testsuite_element) 368 inherited_test = int(testsuites_element.get( 369 ReportConstant.tests, 0)) + int( 370 history_testsuite_element.get(ReportConstant.tests, 0)) 371 testsuites_element.set(ReportConstant.tests, 372 str(inherited_test)) 373 continue 374 375 pass_num = 0 376 for history_testcase_element in history_testsuite_element: 377 if self._check_testcase_pass(history_testcase_element): 378 target_testsuite_element.append(history_testcase_element) 379 pass_num += 1 380 381 inherited_test = int(target_testsuite_element.get( 382 ReportConstant.tests, 0)) + pass_num 383 target_testsuite_element.set(ReportConstant.tests, 384 str(inherited_test)) 385 inherited_test = int(testsuites_element.get( 386 ReportConstant.tests, 0)) + pass_num 387 testsuites_element.set(ReportConstant.tests, str(inherited_test)) 388 389 def _get_history_execute_result(self, execute_result_name): 390 if execute_result_name.endswith(".xml"): 391 execute_result_name = execute_result_name[:-4] 392 history_execute_result = \ 393 self._get_data_report_from_record(execute_result_name) 394 if history_execute_result: 395 return history_execute_result 396 for root_dir, _, files in os.walk( 397 self.task.config.history_report_path): 398 for result_file in files: 399 if result_file.endswith(execute_result_name): 400 history_execute_result = os.path.abspath( 401 os.path.join(root_dir, result_file)) 402 return history_execute_result 403 404 @classmethod 405 def _check_testcase_pass(cls, history_testcase_element): 406 case = Case() 407 case.result = history_testcase_element.get(ReportConstant.result, "") 408 case.status = history_testcase_element.get(ReportConstant.status, "") 409 case.message = history_testcase_element.get(ReportConstant.message, "") 410 if len(history_testcase_element) > 0: 411 if not case.result: 412 case.result = ReportConstant.false 413 case.message = history_testcase_element[0].get( 414 ReportConstant.message) 415 416 return case.is_passed() 417 418 @classmethod 419 def _is_empty_report(cls, testsuites_element): 420 if len(testsuites_element) < 1: 421 return True 422 if len(testsuites_element) >= 2: 423 return False 424 425 if int(testsuites_element[0].get(ReportConstant.unavailable, 0)) > 0: 426 return True 427 return False 428 429 def _get_data_report_from_record(self, execute_result_name): 430 history_report_path = \ 431 getattr(self.task.config, "history_report_path", "") 432 if history_report_path: 433 from _core.report.result_reporter import ResultReporter 434 params = ResultReporter.get_task_info_params(history_report_path) 435 if params: 436 report_data_dict = dict(params[ReportConst.data_reports]) 437 if execute_result_name in report_data_dict.keys(): 438 return report_data_dict.get(execute_result_name) 439 elif execute_result_name.split(".")[0] in \ 440 report_data_dict.keys(): 441 return report_data_dict.get( 442 execute_result_name.split(".")[0]) 443 return "" 444 445 @classmethod 446 def _get_real_execute_result(cls, execute_result): 447 from xdevice import SuiteReporter 448 LOG.debug("Get real execute result length is: %s" % 449 len(SuiteReporter.get_report_result())) 450 if check_mode(ModeType.decc): 451 for suite_report, report_result in \ 452 SuiteReporter.get_report_result(): 453 if os.path.splitext(suite_report)[0] == \ 454 os.path.splitext(execute_result)[0]: 455 return report_result 456 return "" 457 else: 458 return execute_result 459 460 @classmethod 461 def _get_real_history_execute_result(cls, history_execute_result, 462 module_name): 463 from xdevice import SuiteReporter 464 LOG.debug("Get real history execute result: %s" % 465 SuiteReporter.history_report_result) 466 if check_mode(ModeType.decc): 467 virtual_report_path, report_result = SuiteReporter. \ 468 get_history_result_by_module(module_name) 469 return report_result 470 else: 471 return history_execute_result 472 473 474class DriversDryRunThread(threading.Thread): 475 def __init__(self, test_driver, task, environment, message_queue): 476 threading.Thread.__init__(self) 477 self.test_driver = test_driver 478 self.listeners = None 479 self.task = task 480 self.environment = environment 481 self.message_queue = message_queue 482 self.thread_id = None 483 self.error_message = "" 484 485 def set_thread_id(self, thread_id): 486 self.thread_id = thread_id 487 488 def run(self): 489 from xdevice import Scheduler 490 LOG.debug("Thread id: %s start" % self.thread_id) 491 start_time = time.time() 492 execute_message = ExecuteMessage('', self.environment, 493 self.test_driver, self.thread_id) 494 driver, test = None, None 495 try: 496 if self.test_driver and Scheduler.is_execute: 497 # construct params 498 driver, test = self.test_driver 499 driver_request = self._get_driver_request(test, 500 execute_message) 501 if driver_request is None: 502 return 503 504 # setup device 505 self._do_task_setup(driver_request) 506 507 # driver execute 508 self.reset_device(driver_request.config) 509 driver.__dry_run_execute__(driver_request) 510 511 except Exception as exception: 512 error_no = getattr(exception, "error_no", "00000") 513 if self.environment is None: 514 LOG.exception("Exception: %s", exception, exc_info=False, 515 error_no=error_no) 516 else: 517 LOG.exception( 518 "Device: %s, exception: %s" % ( 519 self.environment.__get_serial__(), exception), 520 exc_info=False, error_no=error_no) 521 self.error_message = "{}: {}".format( 522 get_instance_name(exception), str(exception)) 523 524 finally: 525 self._handle_finally(driver, execute_message, start_time, test) 526 527 @staticmethod 528 def reset_device(config): 529 if getattr(config, "reboot_per_module", False): 530 for device in config.environment.devices: 531 device.reboot() 532 533 def _handle_finally(self, driver, execute_message, start_time, test): 534 from xdevice import Scheduler 535 # output execute time 536 end_time = time.time() 537 execute_time = VisionHelper.get_execute_time(int( 538 end_time - start_time)) 539 source_content = self.test_driver[1].source.source_file or \ 540 self.test_driver[1].source.source_string 541 LOG.info("Executed: %s, Execution Time: %s" % ( 542 source_content, execute_time)) 543 544 # set execute state 545 if self.error_message: 546 execute_message.set_state(ExecuteMessage.DEVICE_ERROR) 547 else: 548 execute_message.set_state(ExecuteMessage.DEVICE_FINISH) 549 550 # free environment 551 if self.environment: 552 LOG.debug("Thread %s free environment", 553 execute_message.get_thread_id()) 554 Scheduler.__free_environment__(execute_message.get_environment()) 555 556 LOG.debug("Put thread %s result", self.thread_id) 557 self.message_queue.put(execute_message) 558 559 def _do_task_setup(self, driver_request): 560 if check_mode(ModeType.decc) or getattr( 561 driver_request.config, ConfigConst.check_device, False): 562 return 563 564 if self.environment is None: 565 return 566 567 from xdevice import Scheduler 568 for device in self.environment.devices: 569 if not getattr(device, ConfigConst.need_kit_setup, True): 570 LOG.debug("Device %s need kit setup is false" % device) 571 continue 572 573 # do task setup for device 574 kits_copy = copy.deepcopy(self.task.config.kits) 575 setattr(device, ConfigConst.task_kits, kits_copy) 576 for kit in getattr(device, ConfigConst.task_kits, []): 577 if not Scheduler.is_execute: 578 break 579 try: 580 kit.__setup__(device, request=driver_request) 581 except (ParamError, ExecuteTerminate, DeviceError, 582 LiteDeviceError, ValueError, TypeError, 583 SyntaxError, AttributeError) as exception: 584 error_no = getattr(exception, "error_no", "00000") 585 LOG.exception( 586 "Task setup device: %s, exception: %s" % ( 587 self.environment.__get_serial__(), 588 exception), exc_info=False, error_no=error_no) 589 LOG.debug("Set device %s need kit setup to false" % device) 590 setattr(device, ConfigConst.need_kit_setup, False) 591 592 # set product_info to self.task 593 if getattr(driver_request, ConfigConst.product_info, "") and not \ 594 getattr(self.task, ConfigConst.product_info, ""): 595 product_info = getattr(driver_request, ConfigConst.product_info) 596 if not isinstance(product_info, dict): 597 LOG.warning("Product info should be dict, %s", 598 product_info) 599 return 600 setattr(self.task, ConfigConst.product_info, product_info) 601 602 def _get_driver_request(self, root_desc, execute_message): 603 config = Config() 604 config.update(copy.deepcopy(self.task.config).__dict__) 605 config.environment = self.environment 606 if self.listeners: 607 for listener in self.listeners: 608 LOG.debug("Thread id %s, listener %s" % (self.thread_id, listener)) 609 driver_request = Request(self.thread_id, root_desc, self.listeners, 610 config) 611 execute_message.set_request(driver_request) 612 return driver_request 613 614 615class QueueMonitorThread(threading.Thread): 616 617 def __init__(self, message_queue, current_driver_threads, test_drivers): 618 threading.Thread.__init__(self) 619 self.message_queue = message_queue 620 self.current_driver_threads = current_driver_threads 621 self.test_drivers = test_drivers 622 623 def run(self): 624 from xdevice import Scheduler 625 LOG.debug("Queue monitor thread start") 626 while self.test_drivers or self.current_driver_threads: 627 if not self.current_driver_threads: 628 time.sleep(3) 629 continue 630 execute_message = self.message_queue.get() 631 632 self.current_driver_threads.pop(execute_message.get_thread_id()) 633 634 if execute_message.get_state() == ExecuteMessage.DEVICE_FINISH: 635 LOG.debug("Thread id: %s execute finished" % 636 execute_message.get_thread_id()) 637 elif execute_message.get_state() == ExecuteMessage.DEVICE_ERROR: 638 LOG.debug("Thread id: %s execute error" % 639 execute_message.get_thread_id()) 640 641 if Scheduler.upload_address: 642 Scheduler.upload_module_result(execute_message) 643 644 LOG.debug("Queue monitor thread end") 645 if not Scheduler.is_execute: 646 LOG.info("Terminate success") 647 Scheduler.terminate_result.put("terminate success") 648 649 650class ExecuteMessage: 651 DEVICE_RUN = 'device_run' 652 DEVICE_FINISH = 'device_finish' 653 DEVICE_ERROR = 'device_error' 654 655 def __init__(self, state, environment, drivers, thread_id): 656 self.state = state 657 self.environment = environment 658 self.drivers = drivers 659 self.thread_id = thread_id 660 self.request = None 661 self.result = None 662 663 def set_state(self, state): 664 self.state = state 665 666 def get_state(self): 667 return self.state 668 669 def set_request(self, request): 670 self.request = request 671 672 def get_request(self): 673 return self.request 674 675 def set_result(self, result): 676 self.result = result 677 678 def get_result(self): 679 return self.result 680 681 def get_environment(self): 682 return self.environment 683 684 def get_thread_id(self): 685 return self.thread_id 686 687 def get_drivers(self): 688 return self.drivers 689