1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import copy 20import os 21import shutil 22import threading 23import time 24from concurrent.futures import ThreadPoolExecutor 25from concurrent.futures import wait 26 27from _core.constants import ModeType 28from _core.constants import ConfigConst 29from _core.executor.request import Request 30from _core.logger import platform_logger 31from _core.plugin import Config 32from _core.utils import get_instance_name 33from _core.utils import get_filename_extension 34from _core.utils import check_mode 35from _core.exception import ParamError 36from _core.exception import ExecuteTerminate 37from _core.exception import DeviceError 38from _core.exception import LiteDeviceError 39from _core.report.reporter_helper import VisionHelper 40from _core.report.reporter_helper import ReportConstant 41from _core.report.reporter_helper import DataHelper 42from _core.report.reporter_helper import Suite 43from _core.report.reporter_helper import Case 44 45LOG = platform_logger("Concurrent") 46 47 48class Concurrent: 49 @classmethod 50 def executor_callback(cls, worker): 51 worker_exception = worker.exception() 52 if worker_exception: 53 LOG.error("Worker return exception: {}".format(worker_exception)) 54 55 @classmethod 56 def concurrent_execute(cls, func, params_list, max_size=8): 57 """ 58 Provider the ability to execute target function concurrently 59 :param func: target function name 60 :param params_list: the list of params in these target functions 61 :param max_size: the max size of thread you wanted in thread pool 62 :return: 63 """ 64 with ThreadPoolExecutor(max_size) as executor: 65 future_params = dict() 66 for params in params_list: 67 future = executor.submit(func, *params) 68 future_params.update({future: params}) 69 future.add_done_callback(cls.executor_callback) 70 wait(future_params) # wait all function complete 71 result_list = [] 72 for future in future_params: 73 result_list.append((future.result(), future_params[future])) 74 return result_list 75 76 77class DriversThread(threading.Thread): 78 def __init__(self, test_driver, task, environment, message_queue): 79 threading.Thread.__init__(self) 80 self.test_driver = test_driver 81 self.listeners = None 82 self.task = task 83 self.environment = environment 84 self.message_queue = message_queue 85 self.thread_id = None 86 self.error_message = "" 87 88 def set_listeners(self, listeners): 89 self.listeners = listeners 90 if self.environment is None: 91 return 92 93 for listener in listeners: 94 listener.device_sn = self.environment.devices[0].device_sn 95 96 def set_thread_id(self, thread_id): 97 self.thread_id = thread_id 98 99 def run(self): 100 from xdevice import Scheduler 101 LOG.debug("thread id: %s start" % self.thread_id) 102 start_time = time.time() 103 execute_message = ExecuteMessage('', self.environment, 104 self.test_driver, self.thread_id) 105 driver, test = None, None 106 try: 107 if self.test_driver and Scheduler.is_execute: 108 # construct params 109 driver, test = self.test_driver 110 driver_request = self._get_driver_request(test, 111 execute_message) 112 if driver_request is None: 113 return 114 115 # setup device 116 self._do_task_setup(driver_request) 117 118 # driver execute 119 self.reset_device(driver_request.config) 120 driver.__execute__(driver_request) 121 122 except Exception as exception: 123 error_no = getattr(exception, "error_no", "00000") 124 if self.environment is None: 125 LOG.exception("exception: %s", exception, exc_info=False, 126 error_no=error_no) 127 else: 128 LOG.exception( 129 "device: %s, exception: %s" % ( 130 self.environment.__get_serial__(), exception), 131 exc_info=False, error_no=error_no) 132 self.error_message = "{}: {}".format( 133 get_instance_name(exception), str(exception)) 134 135 finally: 136 self._handle_finally(driver, execute_message, start_time, test) 137 138 @staticmethod 139 def reset_device(config): 140 if getattr(config, "reboot_per_module", False): 141 for device in config.environment.devices: 142 device.reboot() 143 144 def _handle_finally(self, driver, execute_message, start_time, test): 145 from xdevice import Scheduler 146 # output execute time 147 end_time = time.time() 148 execute_time = VisionHelper.get_execute_time(int( 149 end_time - start_time)) 150 source_content = self.test_driver[1].source.source_file or \ 151 self.test_driver[1].source.source_string 152 LOG.info("Executed: %s, Execution Time: %s" % ( 153 source_content, execute_time)) 154 155 # inherit history report under retry mode 156 if driver and test: 157 execute_result = driver.__result__() 158 LOG.debug("execute_result:%s" % execute_result) 159 if getattr(self.task.config, "history_report_path", ""): 160 execute_result = self._inherit_execute_result( 161 execute_result, test) 162 execute_message.set_result(execute_result) 163 164 # set execute state 165 if self.error_message: 166 execute_message.set_state(ExecuteMessage.DEVICE_ERROR) 167 else: 168 execute_message.set_state(ExecuteMessage.DEVICE_FINISH) 169 170 # free environment 171 if self.environment: 172 LOG.debug("thread %s free environment", 173 execute_message.get_thread_id()) 174 Scheduler.__free_environment__(execute_message.get_environment()) 175 176 LOG.debug("put thread %s result", self.thread_id) 177 self.message_queue.put(execute_message) 178 LOG.info("") 179 180 def _do_task_setup(self, driver_request): 181 if check_mode(ModeType.decc) or getattr( 182 driver_request.config, ConfigConst.check_device, False): 183 return 184 185 if self.environment is None: 186 return 187 188 from xdevice import Scheduler 189 for device in self.environment.devices: 190 if not getattr(device, ConfigConst.need_kit_setup, True): 191 LOG.debug("device %s need kit setup is false" % device) 192 continue 193 194 # do task setup for device 195 kits_copy = copy.deepcopy(self.task.config.kits) 196 setattr(device, ConfigConst.task_kits, kits_copy) 197 for kit in getattr(device, ConfigConst.task_kits, []): 198 if not Scheduler.is_execute: 199 break 200 try: 201 kit.__setup__(device, request=driver_request) 202 except (ParamError, ExecuteTerminate, DeviceError, 203 LiteDeviceError, ValueError, TypeError, 204 SyntaxError, AttributeError) as exception: 205 error_no = getattr(exception, "error_no", "00000") 206 LOG.exception( 207 "task setup device: %s, exception: %s" % ( 208 self.environment.__get_serial__(), 209 exception), exc_info=False, error_no=error_no) 210 LOG.debug("set device %s need kit setup to false" % device) 211 setattr(device, ConfigConst.need_kit_setup, False) 212 213 # set product_info to self.task 214 if getattr(driver_request, ConfigConst.product_info, "") and not \ 215 getattr(self.task, ConfigConst.product_info, ""): 216 product_info = getattr(driver_request, ConfigConst.product_info) 217 if not isinstance(product_info, dict): 218 LOG.warning("product info should be dict, %s", 219 product_info) 220 return 221 setattr(self.task, ConfigConst.product_info, product_info) 222 223 def _get_driver_request(self, root_desc, execute_message): 224 config = Config() 225 config.update(copy.deepcopy(self.task.config).__dict__) 226 config.environment = self.environment 227 if getattr(config, "history_report_path", ""): 228 # modify config.testargs 229 history_report_path = getattr(config, "history_report_path", "") 230 module_name = root_desc.source.module_name 231 unpassed_test_params = self._get_unpassed_test_params( 232 history_report_path, module_name) 233 if not unpassed_test_params: 234 LOG.info("%s all test cases are passed, no need retry", 235 module_name) 236 driver_request = Request(self.thread_id, root_desc, 237 self.listeners, config) 238 execute_message.set_request(driver_request) 239 return None 240 if unpassed_test_params[0] != module_name and \ 241 unpassed_test_params[0] != str(module_name).split(".")[0]: 242 test_args = getattr(config, "testargs", {}) 243 test_params = [] 244 for unpassed_test_param in unpassed_test_params: 245 if unpassed_test_param not in test_params: 246 test_params.append(unpassed_test_param) 247 test_args["test"] = test_params 248 if "class" in test_args.keys(): 249 test_args.pop("class") 250 setattr(config, "testargs", test_args) 251 252 for listener in self.listeners: 253 LOG.debug("thread id %s, listener %s" % (self.thread_id, listener)) 254 driver_request = Request(self.thread_id, root_desc, self.listeners, 255 config) 256 execute_message.set_request(driver_request) 257 return driver_request 258 259 @classmethod 260 def _get_unpassed_test_params(cls, history_report_path, module_name): 261 unpassed_test_params = [] 262 from _core.report.result_reporter import ResultReporter 263 params = ResultReporter.get_task_info_params(history_report_path) 264 if not params: 265 return unpassed_test_params 266 267 failed_list = params[3].get(module_name, []) 268 if not failed_list: 269 failed_list = params[3].get(str(module_name).split(".")[0], []) 270 unpassed_test_params.extend(failed_list) 271 LOG.debug("get unpassed test params %s", unpassed_test_params) 272 return unpassed_test_params 273 274 @classmethod 275 def _append_unpassed_test_param(cls, history_report_file, 276 unpassed_test_params): 277 278 testsuites_element = DataHelper.parse_data_report(history_report_file) 279 for testsuite_element in testsuites_element: 280 suite_name = testsuite_element.get("name", "") 281 suite = Suite() 282 suite.set_cases(testsuite_element) 283 for case in suite.cases: 284 if case.is_passed(): 285 continue 286 unpassed_test_param = "{}#{}#{}".format( 287 suite_name, case.classname, case.name) 288 unpassed_test_params.append(unpassed_test_param) 289 290 def _inherit_execute_result(self, execute_result, root_desc): 291 module_name = root_desc.source.module_name 292 execute_result_name = "%s.xml" % module_name 293 history_execute_result = self._get_history_execute_result( 294 execute_result_name) 295 if not history_execute_result: 296 LOG.warning("%s no history execute result exists", 297 execute_result_name) 298 return execute_result 299 300 if not check_mode(ModeType.decc): 301 if not os.path.exists(execute_result): 302 result_dir = \ 303 os.path.join(self.task.config.report_path, "result") 304 os.makedirs(result_dir, exist_ok=True) 305 target_execute_result = os.path.join(result_dir, 306 execute_result_name) 307 shutil.copyfile(history_execute_result, target_execute_result) 308 LOG.info("copy %s to %s" % (history_execute_result, 309 target_execute_result)) 310 return target_execute_result 311 312 real_execute_result = self._get_real_execute_result(execute_result) 313 314 # inherit history execute result 315 testsuites_element = DataHelper.parse_data_report(real_execute_result) 316 if self._is_empty_report(testsuites_element): 317 if check_mode(ModeType.decc): 318 LOG.info("empty report no need to inherit history execute" 319 " result") 320 else: 321 LOG.info("empty report '%s' no need to inherit history execute" 322 " result", history_execute_result) 323 return execute_result 324 325 real_history_execute_result = self._get_real_history_execute_result( 326 history_execute_result, module_name) 327 328 history_testsuites_element = DataHelper.parse_data_report( 329 real_history_execute_result) 330 if self._is_empty_report(history_testsuites_element): 331 LOG.info("history report '%s' is empty", history_execute_result) 332 return execute_result 333 if check_mode(ModeType.decc): 334 LOG.info("inherit history execute result") 335 else: 336 LOG.info("inherit history execute result: %s", 337 history_execute_result) 338 self._inherit_element(history_testsuites_element, testsuites_element) 339 340 if check_mode(ModeType.decc): 341 from xdevice import SuiteReporter 342 SuiteReporter.append_report_result( 343 (execute_result, DataHelper.to_string(testsuites_element))) 344 else: 345 # generate inherit execute result 346 DataHelper.generate_report(testsuites_element, execute_result) 347 return execute_result 348 349 def _inherit_element(self, history_testsuites_element, testsuites_element): 350 for history_testsuite_element in history_testsuites_element: 351 history_testsuite_name = history_testsuite_element.get("name", "") 352 target_testsuite_element = None 353 for testsuite_element in testsuites_element: 354 if history_testsuite_name == testsuite_element.get("name", ""): 355 target_testsuite_element = testsuite_element 356 break 357 358 if target_testsuite_element is None: 359 testsuites_element.append(history_testsuite_element) 360 inherited_test = int(testsuites_element.get( 361 ReportConstant.tests, 0)) + int( 362 history_testsuite_element.get(ReportConstant.tests, 0)) 363 testsuites_element.set(ReportConstant.tests, 364 str(inherited_test)) 365 continue 366 367 pass_num = 0 368 for history_testcase_element in history_testsuite_element: 369 if self._check_testcase_pass(history_testcase_element): 370 target_testsuite_element.append(history_testcase_element) 371 pass_num += 1 372 373 inherited_test = int(target_testsuite_element.get( 374 ReportConstant.tests, 0)) + pass_num 375 target_testsuite_element.set(ReportConstant.tests, 376 str(inherited_test)) 377 inherited_test = int(testsuites_element.get( 378 ReportConstant.tests, 0)) + pass_num 379 testsuites_element.set(ReportConstant.tests, str(inherited_test)) 380 381 def _get_history_execute_result(self, execute_result_name): 382 if execute_result_name.endswith(".xml"): 383 execute_result_name = execute_result_name[:-4] 384 history_execute_result = \ 385 self._get_data_report_from_record(execute_result_name) 386 if history_execute_result: 387 return history_execute_result 388 for root_dir, _, files in os.walk( 389 self.task.config.history_report_path): 390 for result_file in files: 391 if result_file.endswith(execute_result_name): 392 history_execute_result = os.path.abspath( 393 os.path.join(root_dir, result_file)) 394 return history_execute_result 395 396 @classmethod 397 def _check_testcase_pass(cls, history_testcase_element): 398 case = Case() 399 case.result = history_testcase_element.get(ReportConstant.result, "") 400 case.status = history_testcase_element.get(ReportConstant.status, "") 401 case.message = history_testcase_element.get(ReportConstant.message, "") 402 if len(history_testcase_element) > 0: 403 if not case.result: 404 case.result = ReportConstant.false 405 case.message = history_testcase_element[0].get( 406 ReportConstant.message) 407 408 return case.is_passed() 409 410 @classmethod 411 def _is_empty_report(cls, testsuites_element): 412 if len(testsuites_element) < 1: 413 return True 414 if len(testsuites_element) >= 2: 415 return False 416 417 if int(testsuites_element[0].get(ReportConstant.unavailable, 0)) > 0: 418 return True 419 return False 420 421 def _get_data_report_from_record(self, execute_result_name): 422 history_report_path = \ 423 getattr(self.task.config, "history_report_path", "") 424 if history_report_path: 425 from _core.report.result_reporter import ResultReporter 426 params = ResultReporter.get_task_info_params(history_report_path) 427 if params: 428 report_data_dict = dict(params[4]) 429 if execute_result_name in report_data_dict.keys(): 430 return report_data_dict.get(execute_result_name) 431 elif execute_result_name.split(".")[0] in \ 432 report_data_dict.keys(): 433 return report_data_dict.get( 434 execute_result_name.split(".")[0]) 435 return "" 436 437 @classmethod 438 def _get_real_execute_result(cls, execute_result): 439 from xdevice import SuiteReporter 440 LOG.debug("get_real_execute_result length is: %s" % 441 len(SuiteReporter.get_report_result())) 442 if check_mode(ModeType.decc): 443 for suite_report, report_result in \ 444 SuiteReporter.get_report_result(): 445 if os.path.splitext(suite_report)[0] == \ 446 os.path.splitext(execute_result)[0]: 447 return report_result 448 return "" 449 else: 450 return execute_result 451 452 @classmethod 453 def _get_real_history_execute_result(cls, history_execute_result, 454 module_name): 455 from xdevice import SuiteReporter 456 LOG.debug("get_real_history_execute_result: %s" % 457 SuiteReporter.history_report_result) 458 if check_mode(ModeType.decc): 459 virtual_report_path, report_result = SuiteReporter. \ 460 get_history_result_by_module(module_name) 461 return report_result 462 else: 463 return history_execute_result 464 465 466class QueueMonitorThread(threading.Thread): 467 468 def __init__(self, message_queue, current_driver_threads, test_drivers): 469 threading.Thread.__init__(self) 470 self.message_queue = message_queue 471 self.current_driver_threads = current_driver_threads 472 self.test_drivers = test_drivers 473 474 def run(self): 475 from xdevice import Scheduler 476 LOG.debug("queue monitor thread start") 477 while self.test_drivers or self.current_driver_threads: 478 if not self.current_driver_threads: 479 time.sleep(3) 480 continue 481 execute_message = self.message_queue.get() 482 483 self.current_driver_threads.pop(execute_message.get_thread_id()) 484 485 if execute_message.get_state() == ExecuteMessage.DEVICE_FINISH: 486 LOG.debug("thread id: %s execute finished" % 487 execute_message.get_thread_id()) 488 elif execute_message.get_state() == ExecuteMessage.DEVICE_ERROR: 489 LOG.debug("thread id: %s execute error" % 490 execute_message.get_thread_id()) 491 492 if Scheduler.upload_address: 493 Scheduler.upload_module_result(execute_message) 494 495 LOG.debug("queue monitor thread end") 496 if not Scheduler.is_execute: 497 LOG.info("terminate success") 498 Scheduler.terminate_result.put("terminate success") 499 500 501class ExecuteMessage: 502 DEVICE_RUN = 'device_run' 503 DEVICE_FINISH = 'device_finish' 504 DEVICE_ERROR = 'device_error' 505 506 def __init__(self, state, environment, drivers, thread_id): 507 self.state = state 508 self.environment = environment 509 self.drivers = drivers 510 self.thread_id = thread_id 511 self.request = None 512 self.result = None 513 514 def set_state(self, state): 515 self.state = state 516 517 def get_state(self): 518 return self.state 519 520 def set_request(self, request): 521 self.request = request 522 523 def get_request(self): 524 return self.request 525 526 def set_result(self, result): 527 self.result = result 528 529 def get_result(self): 530 return self.result 531 532 def get_environment(self): 533 return self.environment 534 535 def get_thread_id(self): 536 return self.thread_id 537 538 def get_drivers(self): 539 return self.drivers 540