• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import copy
20import os
21import shutil
22import threading
23import time
24from concurrent.futures import ThreadPoolExecutor
25from concurrent.futures import wait
26
27from _core.constants import ModeType
28from _core.constants import ConfigConst
29from _core.constants import ReportConst
30from _core.error import ErrorMessage
31from _core.executor.request import Request
32from _core.logger import platform_logger
33from _core.logger import redirect_driver_log_begin
34from _core.logger import redirect_driver_log_end
35from _core.plugin import Config
36from _core.plugin import get_plugin
37from _core.plugin import Plugin
38from _core.utils import calculate_elapsed_time
39from _core.utils import check_mode
40from _core.utils import get_file_absolute_path
41from _core.utils import get_kit_instances
42from _core.utils import check_device_name
43from _core.utils import check_device_env_index
44from _core.utils import get_repeat_round
45from _core.exception import ParamError
46from _core.exception import ExecuteTerminate
47from _core.exception import DeviceError
48from _core.exception import LiteDeviceError
49from _core.report.reporter_helper import ReportConstant
50from _core.report.reporter_helper import DataHelper
51from _core.report.reporter_helper import Suite
52from _core.report.reporter_helper import Case
53from _core.context.center import Context
54from _core.context.handler import handle_repeat_result
55from _core.context.handler import update_report_xml
56from _core.context.upload import Uploader
57from _core.testkit.json_parser import JsonParser
58
59LOG = platform_logger("Concurrent")
60
61
62class Concurrent:
63    @classmethod
64    def executor_callback(cls, worker):
65        worker_exception = worker.exception()
66        if worker_exception:
67            LOG.error("Worker return exception: {}".format(worker_exception))
68
69    @classmethod
70    def concurrent_execute(cls, func, params_list, max_size=8):
71        """
72        Provider the ability to execute target function concurrently
73        :param func: target function name
74        :param params_list: the list of params in these target functions
75        :param max_size:  the max size of thread  you wanted  in thread pool
76        :return:
77        """
78        with ThreadPoolExecutor(max_size) as executor:
79            future_params = dict()
80            for params in params_list:
81                future = executor.submit(func, *params)
82                future_params.update({future: params})
83                future.add_done_callback(cls.executor_callback)
84            wait(future_params)  # wait all function complete
85            result_list = []
86            for future in future_params:
87                result_list.append((future.result(), future_params[future]))
88            return result_list
89
90
91class DriversThread(threading.Thread):
92    def __init__(self, test_driver, task, environment, message_queue):
93        super().__init__()
94        self.test_driver = test_driver
95        self.listeners = None
96        self.task = task
97        self.environment = environment
98        self.message_queue = message_queue
99        self.error_message = ""
100        self.module_config_kits = None
101        self.repeat = 1
102        self.repeat_round = 1
103        self.start_time = time.time()
104
105    def set_listeners(self, listeners):
106        self.listeners = listeners
107        if self.environment is None:
108            return
109
110        for listener in listeners:
111            listener.device_sn = self.environment.devices[0].device_sn
112
113    def get_driver_log_file(self, test):
114        self.repeat = Context.get_scheduler().get_repeat_index()
115        self.repeat_round = get_repeat_round(test.unique_id)
116        round_folder = f"round{self.repeat_round}" if self.repeat > 1 else ""
117        log_file = os.path.join(
118            self.task.config.log_path, round_folder,
119            test.source.module_name, ReportConstant.module_run_log)
120        return log_file
121
122    def run(self):
123        driver, test = None, None
124        if self.test_driver and Context.is_executing():
125            driver, test = self.test_driver
126        if driver is None or test is None:
127            return
128        redirect_driver_log_begin(self.ident, self.get_driver_log_file(test))
129        LOG.debug("Thread %s start" % self.name)
130        execute_message = ExecuteMessage('', self.environment, self.test_driver, self.name)
131        try:
132            # construct params
133            driver_request = self._get_driver_request(test, execute_message)
134            if driver_request is None:
135                return
136            # setup device
137            self._do_task_setup(driver_request)
138            # driver execute
139            self.reset_device(driver_request.config)
140            driver.__execute__(driver_request)
141        except Exception as exception:
142            error_no = getattr(exception, "error_no", "00000")
143            if self.environment is None:
144                LOG.exception("Exception: %s", exception, exc_info=False,
145                              error_no=error_no)
146            else:
147                LOG.exception(
148                    "Device: %s, exception: %s" % (
149                        self.environment.__get_serial__(), exception),
150                    exc_info=False, error_no=error_no)
151            self.error_message = str(exception)
152
153        finally:
154            self._do_commom_module_kit_teardown()
155            self._handle_finally(driver, test, execute_message)
156        redirect_driver_log_end(self.ident)
157
158    @staticmethod
159    def reset_device(config):
160        if getattr(config, "reboot_per_module", False):
161            for device in config.environment.devices:
162                device.reboot()
163
164    def _handle_finally(self, driver, test, execute_message):
165        source_content = test.source.source_file or test.source.source_string
166        end_time = time.time()
167        LOG.info("Executed: %s, Execution Time: %s" % (
168            source_content, calculate_elapsed_time(self.start_time, end_time)))
169
170        # inherit history report under retry mode
171        if driver and test:
172            execute_result = driver.__result__()
173            # move result file to round folder when repeat > 1
174            if self.repeat > 1:
175                execute_result = handle_repeat_result(
176                    execute_result, self.task.config.report_path,
177                    round_folder=f"round{self.repeat_round}")
178            LOG.debug("Execute result: %s" % execute_result)
179
180            # update result xml
181            update_props = {
182                ReportConstant.start_time: time.strftime(
183                    ReportConstant.time_format, time.localtime(int(self.start_time))),
184                ReportConstant.end_time: time.strftime(
185                    ReportConstant.time_format, time.localtime(int(end_time))),
186                ReportConstant.repeat: str(self.repeat),
187                ReportConstant.round: str(self.repeat_round),
188                ReportConstant.test_type: test.source.test_type
189            }
190            if self.environment is not None:
191                update_props.update({ReportConstant.devices: self.environment.get_description()})
192            update_report_xml(execute_result, update_props)
193
194            if getattr(self.task.config, "history_report_path", ""):
195                execute_result = self._inherit_execute_result(execute_result, test)
196            execute_message.set_result(execute_result)
197
198        # set execute state
199        if self.error_message:
200            execute_message.set_state(ExecuteMessage.DEVICE_ERROR)
201        else:
202            execute_message.set_state(ExecuteMessage.DEVICE_FINISH)
203
204        # free environment
205        if self.environment:
206            LOG.debug("Thread %s free environment", execute_message.get_thread_name())
207            Context.get_scheduler().__free_environment__(execute_message.get_environment())
208
209        LOG.debug("Thread %s put result", self.name)
210        self.message_queue.put(execute_message)
211        LOG.info("Thread %s end", self.name)
212
213    def _do_common_module_kit_setup(self, driver_request):
214        for device in self.environment.devices:
215            setattr(device, ConfigConst.common_module_kits, [])
216
217        for kit in self.module_config_kits:
218            run_flag = False
219            for device in self.environment.devices:
220                if not Context.is_executing():
221                    raise ExecuteTerminate()
222                if not check_device_env_index(device, kit):
223                    continue
224                if check_device_name(device, kit):
225                    run_flag = True
226                    kit_copy = copy.deepcopy(kit)
227                    module_kits = getattr(device, ConfigConst.common_module_kits)
228                    module_kits.append(kit_copy)
229                    kit_copy.__setup__(device, request=driver_request)
230            if not run_flag:
231                err_msg = ErrorMessage.Common.Code_0101004.format(kit.__class__.__name__)
232                LOG.error(err_msg)
233                raise ParamError(err_msg)
234
235    def _do_commom_module_kit_teardown(self):
236        try:
237            for device in self.environment.devices:
238                for kit in getattr(device, ConfigConst.common_module_kits, []):
239                    if check_device_name(device, kit, step="teardown"):
240                        kit.__teardown__(device)
241                setattr(device, ConfigConst.common_module_kits, [])
242        except Exception as e:
243            LOG.error("common module kit teardown error: {}".format(e))
244
245    def _do_task_setup(self, driver_request):
246        if check_mode(ModeType.decc) or getattr(
247                driver_request.config, ConfigConst.check_device, False):
248            return
249
250        if self.environment is None:
251            return
252
253        if (hasattr(driver_request.config, ConfigConst.module_config) and
254                getattr(driver_request.config, ConfigConst.module_config, None)):
255            module_config_path = getattr(driver_request.config, ConfigConst.module_config, None)
256            LOG.debug("Common module config path: {}".format(module_config_path))
257            from xdevice import Variables
258            config_path = get_file_absolute_path(module_config_path,
259                                                 [os.path.join(Variables.exec_dir, "config")])
260            json_config = JsonParser(config_path)
261            self.module_config_kits = get_kit_instances(json_config,
262                                                        driver_request.config.resource_path,
263                                                        driver_request.config.testcases_path)
264            self._do_common_module_kit_setup(driver_request)
265
266        for device in self.environment.devices:
267            if not getattr(device, ConfigConst.need_kit_setup, True):
268                LOG.debug("Device %s need kit setup is false" % device)
269                continue
270
271            # do task setup for device
272            kits_copy = copy.deepcopy(self.task.config.kits)
273            setattr(device, ConfigConst.task_kits, kits_copy)
274            for kit in getattr(device, ConfigConst.task_kits, []):
275                if not Context.is_executing():
276                    break
277                try:
278                    kit.__setup__(device, request=driver_request)
279                except (ParamError, ExecuteTerminate, DeviceError,
280                        LiteDeviceError, ValueError, TypeError,
281                        SyntaxError, AttributeError) as exception:
282                    error_no = getattr(exception, "error_no", "00000")
283                    LOG.exception(
284                        "Task setup device: %s, exception: %s" % (
285                            self.environment.__get_serial__(),
286                            exception), exc_info=False, error_no=error_no)
287            LOG.debug("Set device %s need kit setup to false" % device)
288            setattr(device, ConfigConst.need_kit_setup, False)
289
290        # set product_info to self.task
291        if getattr(driver_request, ConfigConst.product_info, "") and not \
292                getattr(self.task, ConfigConst.product_info, ""):
293            product_info = getattr(driver_request, ConfigConst.product_info)
294            if not isinstance(product_info, dict):
295                LOG.warning("Product info should be dict, %s",
296                            product_info)
297                return
298            setattr(self.task, ConfigConst.product_info, product_info)
299
300    def _get_driver_request(self, root_desc, execute_message):
301        config = Config()
302        config.update(copy.deepcopy(self.task.config).__dict__)
303        config.environment = self.environment
304        if getattr(config, "history_report_path", ""):
305            # modify config.testargs
306            history_report_path = getattr(config, "history_report_path", "")
307            module_name = root_desc.source.module_name
308            unpassed_test_params = self._get_unpassed_test_params(
309                history_report_path, module_name)
310            if not unpassed_test_params:
311                LOG.info("%s all test cases are passed, no need retry", module_name)
312                driver_request = Request(self.name, root_desc, self.listeners, config)
313                execute_message.set_request(driver_request)
314                return None
315            if unpassed_test_params[0] != module_name and \
316                    unpassed_test_params[0] != str(module_name).split(".")[0]:
317                test_args = getattr(config, "testargs", {})
318                test_params = []
319                for unpassed_test_param in unpassed_test_params:
320                    if unpassed_test_param not in test_params:
321                        test_params.append(unpassed_test_param)
322                test_args["test"] = test_params
323                if "class" in test_args.keys():
324                    test_args.pop("class")
325                setattr(config, "testargs", test_args)
326        if getattr(config, "tf_suite", ""):
327            if root_desc.source.module_name in config.tf_suite.keys():
328                config.tf_suite = config.tf_suite.get(
329                    root_desc.source.module_name)
330            else:
331                config.tf_suite = dict()
332        for listener in self.listeners:
333            LOG.debug("Thread %s, listener %s" % (self.name, listener))
334        driver_request = Request(self.name, root_desc, self.listeners, config)
335        execute_message.set_request(driver_request)
336        return driver_request
337
338    @classmethod
339    def _get_unpassed_test_params(cls, history_report_path, module_name):
340        unpassed_test_params = []
341        from _core.report.result_reporter import ResultReporter
342        params = ResultReporter.get_task_info_params(history_report_path)
343        if not params:
344            return unpassed_test_params
345        failed_list = []
346        try:
347            from devicetest.agent.decc import Handler
348            if Handler.DAV.retry_select:
349                for i in Handler.DAV.case_id_list:
350                    failed_list.append(i + "#" + i)
351            else:
352                failed_list = params[ReportConst.unsuccessful_params].get(module_name, [])
353        except Exception:
354            failed_list = params[ReportConst.unsuccessful_params].get(module_name, [])
355        if not failed_list:
356            failed_list = params[ReportConst.unsuccessful_params].get(str(module_name).split(".")[0], [])
357        unpassed_test_params.extend(failed_list)
358        LOG.debug("Get unpassed test params %s", unpassed_test_params)
359        return unpassed_test_params
360
361    @classmethod
362    def _append_unpassed_test_param(cls, history_report_file, unpassed_test_params):
363        testsuites_element = DataHelper.parse_data_report(history_report_file)
364        for testsuite_element in testsuites_element:
365            suite_name = testsuite_element.get("name", "")
366            suite = Suite()
367            suite.set_cases(testsuite_element)
368            for case in suite.cases:
369                if case.is_passed():
370                    continue
371                unpassed_test_param = "{}#{}#{}".format(
372                    suite_name, case.classname, case.name)
373                unpassed_test_params.append(unpassed_test_param)
374
375    def _inherit_execute_result(self, execute_result, root_desc):
376        module_name = root_desc.source.module_name
377        execute_result_name = "%s.xml" % module_name
378        history_execute_result = self._get_history_execute_result(
379            execute_result_name)
380        if not history_execute_result:
381            LOG.warning("%s no history execute result exists",
382                        execute_result_name)
383            return execute_result
384
385        if not check_mode(ModeType.decc):
386            if not os.path.exists(execute_result):
387                result_dir = \
388                    os.path.join(self.task.config.report_path, "result")
389                os.makedirs(result_dir, exist_ok=True)
390                target_execute_result = os.path.join(result_dir,
391                                                     execute_result_name)
392                shutil.copyfile(history_execute_result, target_execute_result)
393                LOG.info("Copy %s to %s" % (history_execute_result,
394                                            target_execute_result))
395                return target_execute_result
396
397        real_execute_result = self._get_real_execute_result(execute_result)
398
399        # inherit history execute result
400        testsuites_element = DataHelper.parse_data_report(real_execute_result)
401        if self._is_empty_report(testsuites_element):
402            if check_mode(ModeType.decc):
403                LOG.info("Empty report no need to inherit history execute"
404                         " result")
405            else:
406                LOG.info("Empty report '%s' no need to inherit history execute"
407                         " result", history_execute_result)
408            return execute_result
409
410        real_history_execute_result = self._get_real_history_execute_result(
411            history_execute_result, module_name)
412
413        history_testsuites_element = DataHelper.parse_data_report(
414            real_history_execute_result)
415        if self._is_empty_report(history_testsuites_element):
416            LOG.info("History report '%s' is empty", history_execute_result)
417            return execute_result
418        if check_mode(ModeType.decc):
419            LOG.info("Inherit history execute result")
420        else:
421            LOG.info("Inherit history execute result: %s",
422                     history_execute_result)
423        self._inherit_element(history_testsuites_element, testsuites_element)
424
425        if check_mode(ModeType.decc):
426            from xdevice import SuiteReporter
427            SuiteReporter.append_report_result(
428                (execute_result, DataHelper.to_string(testsuites_element)))
429        else:
430            # generate inherit execute result
431            DataHelper.generate_report(testsuites_element, execute_result)
432        return execute_result
433
434    def _inherit_element(self, history_testsuites_element, testsuites_element):
435        for history_testsuite_element in history_testsuites_element:
436            history_testsuite_name = history_testsuite_element.get("name", "")
437            target_testsuite_element = None
438            for testsuite_element in testsuites_element:
439                if history_testsuite_name == testsuite_element.get("name", ""):
440                    target_testsuite_element = testsuite_element
441                    break
442
443            if target_testsuite_element is None:
444                testsuites_element.append(history_testsuite_element)
445                inherited_test = int(testsuites_element.get(
446                    ReportConstant.tests, 0)) + int(
447                    history_testsuite_element.get(ReportConstant.tests, 0))
448                testsuites_element.set(ReportConstant.tests,
449                                       str(inherited_test))
450                continue
451
452            pass_num = 0
453            for history_testcase_element in history_testsuite_element:
454                if self._check_testcase_pass(history_testcase_element):
455                    target_testsuite_element.append(history_testcase_element)
456                    pass_num += 1
457
458            inherited_test = int(target_testsuite_element.get(
459                ReportConstant.tests, 0)) + pass_num
460            target_testsuite_element.set(ReportConstant.tests,
461                                         str(inherited_test))
462            inherited_test = int(testsuites_element.get(
463                ReportConstant.tests, 0)) + pass_num
464            testsuites_element.set(ReportConstant.tests, str(inherited_test))
465
466    def _get_history_execute_result(self, execute_result_name):
467        if execute_result_name.endswith(".xml"):
468            execute_result_name = execute_result_name[:-4]
469        history_execute_result = \
470            self._get_data_report_from_record(execute_result_name)
471        if history_execute_result:
472            return history_execute_result
473        for root_dir, _, files in os.walk(
474                self.task.config.history_report_path):
475            for result_file in files:
476                if result_file.endswith(execute_result_name):
477                    history_execute_result = os.path.abspath(
478                        os.path.join(root_dir, result_file))
479        return history_execute_result
480
481    @classmethod
482    def _check_testcase_pass(cls, history_testcase_element):
483        case = Case()
484        case.result = history_testcase_element.get(ReportConstant.result, "")
485        case.status = history_testcase_element.get(ReportConstant.status, "")
486        case.message = history_testcase_element.get(ReportConstant.message, "")
487        if len(history_testcase_element) > 0:
488            if not case.result:
489                case.result = ReportConstant.false
490            case.message = history_testcase_element[0].get(
491                ReportConstant.message)
492
493        return case.is_passed()
494
495    @classmethod
496    def _is_empty_report(cls, testsuites_element):
497        if len(testsuites_element) < 1:
498            return True
499        if len(testsuites_element) >= 2:
500            return False
501
502        if int(testsuites_element[0].get(ReportConstant.unavailable, 0)) > 0:
503            return True
504        return False
505
506    def _get_data_report_from_record(self, execute_result_name):
507        history_report_path = \
508            getattr(self.task.config, "history_report_path", "")
509        if history_report_path:
510            from _core.report.result_reporter import ResultReporter
511            params = ResultReporter.get_task_info_params(history_report_path)
512            if params:
513                report_data_dict = dict(params[ReportConst.data_reports])
514                if execute_result_name in report_data_dict.keys():
515                    return report_data_dict.get(execute_result_name)
516                elif execute_result_name.split(".")[0] in \
517                        report_data_dict.keys():
518                    return report_data_dict.get(
519                        execute_result_name.split(".")[0])
520        return ""
521
522    @classmethod
523    def _get_real_execute_result(cls, execute_result):
524        from xdevice import SuiteReporter
525        LOG.debug("Get real execute result length is: %s" %
526                  len(SuiteReporter.get_report_result()))
527        if check_mode(ModeType.decc):
528            for suite_report, report_result in \
529                    SuiteReporter.get_report_result():
530                if os.path.splitext(suite_report)[0] == \
531                        os.path.splitext(execute_result)[0]:
532                    return report_result
533            return ""
534        else:
535            return execute_result
536
537    @classmethod
538    def _get_real_history_execute_result(cls, history_execute_result,
539                                         module_name):
540        from xdevice import SuiteReporter
541        LOG.debug("Get real history execute result: %s" %
542                  SuiteReporter.history_report_result)
543        if check_mode(ModeType.decc):
544            virtual_report_path, report_result = SuiteReporter. \
545                get_history_result_by_module(module_name)
546            return report_result
547        else:
548            return history_execute_result
549
550
551class DriversDryRunThread(threading.Thread):
552    def __init__(self, test_driver, task, environment, message_queue):
553        super().__init__()
554        self.test_driver = test_driver
555        self.listeners = None
556        self.task = task
557        self.environment = environment
558        self.message_queue = message_queue
559        self.error_message = ""
560
561    def run(self):
562        LOG.debug("Thread %s start" % self.name)
563        start_time = time.time()
564        execute_message = ExecuteMessage('', self.environment, self.test_driver, self.name)
565        driver, test = None, None
566        try:
567            if self.test_driver and Context.is_executing():
568                # construct params
569                driver, test = self.test_driver
570                driver_request = self._get_driver_request(test,
571                                                          execute_message)
572                if driver_request is None:
573                    return
574
575                # setup device
576                self._do_task_setup(driver_request)
577
578                # driver execute
579                self.reset_device(driver_request.config)
580                driver.__dry_run_execute__(driver_request)
581
582        except Exception as exception:
583            error_no = getattr(exception, "error_no", "00000")
584            if self.environment is None:
585                LOG.exception("Exception: %s", exception, exc_info=False,
586                              error_no=error_no)
587            else:
588                LOG.exception(
589                    "Device: %s, exception: %s" % (
590                        self.environment.__get_serial__(), exception),
591                    exc_info=False, error_no=error_no)
592            self.error_message = str(exception)
593
594        finally:
595            self._handle_finally(driver, execute_message, start_time, test)
596
597    @staticmethod
598    def reset_device(config):
599        if getattr(config, "reboot_per_module", False):
600            for device in config.environment.devices:
601                device.reboot()
602
603    def _handle_finally(self, driver, execute_message, start_time, test):
604        source_content = (self.test_driver[1].source.source_file
605                          or self.test_driver[1].source.source_string)
606        LOG.info("Executed: %s, Execution Time: %s" % (
607            source_content, calculate_elapsed_time(start_time, time.time())))
608
609        # set execute state
610        if self.error_message:
611            execute_message.set_state(ExecuteMessage.DEVICE_ERROR)
612        else:
613            execute_message.set_state(ExecuteMessage.DEVICE_FINISH)
614
615        # free environment
616        if self.environment:
617            LOG.debug("Thread %s free environment", execute_message.get_thread_name())
618            Context.get_scheduler().__free_environment__(execute_message.get_environment())
619
620        LOG.debug("Thread %s put result", self.name)
621        self.message_queue.put(execute_message)
622
623    def _do_task_setup(self, driver_request):
624        if check_mode(ModeType.decc) or getattr(
625                driver_request.config, ConfigConst.check_device, False):
626            return
627
628        if self.environment is None:
629            return
630
631        for device in self.environment.devices:
632            if not getattr(device, ConfigConst.need_kit_setup, True):
633                LOG.debug("Device %s need kit setup is false" % device)
634                continue
635
636            # do task setup for device
637            kits_copy = copy.deepcopy(self.task.config.kits)
638            setattr(device, ConfigConst.task_kits, kits_copy)
639            for kit in getattr(device, ConfigConst.task_kits, []):
640                if not Context.is_executing():
641                    break
642                try:
643                    kit.__setup__(device, request=driver_request)
644                except (ParamError, ExecuteTerminate, DeviceError,
645                        LiteDeviceError, ValueError, TypeError,
646                        SyntaxError, AttributeError) as exception:
647                    error_no = getattr(exception, "error_no", "00000")
648                    LOG.exception(
649                        "Task setup device: %s, exception: %s" % (
650                            self.environment.__get_serial__(),
651                            exception), exc_info=False, error_no=error_no)
652            LOG.debug("Set device %s need kit setup to false" % device)
653            setattr(device, ConfigConst.need_kit_setup, False)
654
655        # set product_info to self.task
656        if getattr(driver_request, ConfigConst.product_info, "") and not \
657                getattr(self.task, ConfigConst.product_info, ""):
658            product_info = getattr(driver_request, ConfigConst.product_info)
659            if not isinstance(product_info, dict):
660                LOG.warning("Product info should be dict, %s",
661                            product_info)
662                return
663            setattr(self.task, ConfigConst.product_info, product_info)
664
665    def _get_driver_request(self, root_desc, execute_message):
666        config = Config()
667        config.update(copy.deepcopy(self.task.config).__dict__)
668        config.environment = self.environment
669        if self.listeners:
670            for listener in self.listeners:
671                LOG.debug("Thread %s, listener %s" % (self.name, listener))
672        driver_request = Request(self.name, root_desc, self.listeners, config)
673        execute_message.set_request(driver_request)
674        return driver_request
675
676
677class QueueMonitorThread(threading.Thread):
678
679    def __init__(self, message_queue, current_driver_threads, test_drivers):
680        super().__init__()
681        self.message_queue = message_queue
682        self.current_driver_threads = current_driver_threads
683        self.test_drivers = test_drivers
684
685    def check_current_thread_status(self):
686        for tid_key in self.current_driver_threads.keys():
687            if self.current_driver_threads[tid_key].is_alive():
688                LOG.debug("Running thread is alive, thread is {}.".format(tid_key))
689            else:
690                # if error need free thread environment
691                LOG.debug("Running thread is dead, thread is {}".format(tid_key))
692
693    def run(self):
694        LOG.debug("Queue monitor thread start")
695        while self.test_drivers or self.current_driver_threads:
696            if not self.current_driver_threads:
697                time.sleep(3)
698                continue
699            execute_message = self.message_queue.get()
700
701            self.current_driver_threads.pop(execute_message.get_thread_name())
702
703            if execute_message.get_state() == ExecuteMessage.DEVICE_FINISH:
704                LOG.debug("Thread %s execute finished" % execute_message.get_thread_name())
705            elif execute_message.get_state() == ExecuteMessage.DEVICE_ERROR:
706                LOG.debug("Thread %s execute error" % execute_message.get_thread_name())
707            Uploader.upload_module_result(execute_message)
708
709        LOG.debug("Queue monitor thread end")
710        if not Context.is_executing():
711            LOG.info("Terminate success")
712            Context.get_scheduler().terminate_result.put("terminate success")
713
714
715class ExecuteMessage:
716    DEVICE_RUN = 'device_run'
717    DEVICE_FINISH = 'device_finish'
718    DEVICE_ERROR = 'device_error'
719
720    def __init__(self, state, environment, drivers, thread_name):
721        self.state = state
722        self.environment = environment
723        self.drivers = drivers
724        self.thread_name = thread_name
725        self.request = None
726        self.result = None
727
728    def set_state(self, state):
729        self.state = state
730
731    def get_state(self):
732        return self.state
733
734    def set_request(self, request):
735        self.request = request
736
737    def get_request(self):
738        return self.request
739
740    def set_result(self, result):
741        self.result = result
742
743    def get_result(self):
744        return self.result
745
746    def get_environment(self):
747        return self.environment
748
749    def get_thread_name(self):
750        return self.thread_name
751
752    def get_drivers(self):
753        return self.drivers
754
755
756class ModuleThread(DriversThread):
757
758    def __init__(self, test_driver, task, environment, message_queue, lock):
759        super().__init__(test_driver, task, environment, message_queue)
760        self.lock = lock
761
762    def run(self):
763        driver, test = None, None
764        if self.test_driver and Context.is_executing():
765            driver, test = self.test_driver
766        if driver is None or test is None:
767            return
768        redirect_driver_log_begin(self.ident, self.get_driver_log_file(test))
769        LOG.debug("Thread %s start" % self.name)
770        execute_message = ExecuteMessage('', self.environment, self.test_driver, self.name)
771        driver, test = None, None
772        try:
773            # construct params
774            driver, test = self.test_driver
775            driver_request = self._get_driver_request(test,
776                                                      execute_message)
777            if driver_request is None:
778                return
779
780            # setup device
781            self._do_task_setup(driver_request)
782
783            # do common kit setup
784            self._do_common_kit_setup(self.environment, self.task, self.test_driver)
785
786            # driver execute
787            self.reset_device(driver_request.config)
788            driver.__execute__(driver_request)
789        except Exception as exception:
790            error_no = getattr(exception, "error_no", "00000")
791            if self.environment is None:
792                LOG.exception("Exception: %s", exception, exc_info=False,
793                              error_no=error_no)
794            else:
795                LOG.exception(
796                    "Device: %s, exception: %s" % (
797                        self.environment.__get_serial__(), exception),
798                    exc_info=False, error_no=error_no)
799            self.error_message = str(exception)
800
801        finally:
802            # do common kit teardown
803            self.__do_common_kit_teardown()
804            self._handle_finally(driver, test, execute_message)
805        redirect_driver_log_end(self.ident)
806
807    def _do_common_kit_setup(self, environment, task, test_driver):
808        LOG.info("Do common setup kit")
809        common_kits = task.root.common_kits
810        module_subsystem = test_driver[1].source.module_subsystem
811        try:
812            LOG.info("lock acquire")
813            self.lock.acquire()
814            task.root.task_list.update({module_subsystem: task.root.task_list.get(module_subsystem) - 1})
815        except Exception as e:
816            LOG.error(e)
817        finally:
818            self.lock.release()
819            LOG.info("lock release")
820        kits = common_kits.get(module_subsystem, None)
821        if kits:
822            new_kits = self.get_kit_instances(kits, task.config.resource_path, task.config.testcases_path)
823            LOG.info(new_kits)
824            for device in environment.devices:
825                if not getattr(device, "current_subsystem_kit", None):
826                    setattr(device, "current_subsystem_kit", module_subsystem)
827                    setattr(device, "common_kits", new_kits)
828                    self.__do_common_kit_setup(device)
829                elif getattr(device, "current_subsystem_kit") != module_subsystem:
830                    self.__do_common_kit_teardown()
831                    setattr(device, "current_subsystem_kit", module_subsystem)
832                    setattr(device, "common_kits", new_kits)
833                    self.__do_common_kit_setup(device)
834
835    def __do_common_kit_teardown(self):
836        for device in self.environment.devices:
837            module_subsystem = getattr(device, "current_subsystem_kit", None)
838            if module_subsystem:
839                try:
840                    LOG.info("lock acquire")
841                    self.lock.acquire()
842                    LOG.info(self.task.root.task_list.get(module_subsystem))
843                    if self.task.root.task_list.get(module_subsystem) <= 0:
844                        LOG.info("do common kit teardown")
845                        for kit in getattr(device, ConfigConst.common_kits, []):
846                            kit.__teardown__(device)
847                        setattr(device, ConfigConst.common_kits, [])
848                        setattr(device, "current_subsystem_kit", None)
849                except Exception as e:
850                    LOG.error(e)
851                finally:
852                    self.lock.release()
853                    LOG.info("lock release")
854
855    @staticmethod
856    def __do_common_kit_setup(device):
857        for kit in getattr(device, ConfigConst.common_kits, []):
858            if not Context.is_executing():
859                raise ExecuteTerminate()
860            kit.__setup__(device, request=None)
861
862    @staticmethod
863    def get_kit_instances(kits, resource_path, testcases_path):
864        # get kit instances
865        kit_instances = []
866        for kit in kits:
867            kit["paths"] = [resource_path, testcases_path]
868            kit_type = kit.get("type", "")
869            device_name = kit.get("device_name", None)
870            if get_plugin(plugin_type=Plugin.TEST_KIT, plugin_id=kit_type):
871                test_kit = \
872                    get_plugin(plugin_type=Plugin.TEST_KIT, plugin_id=kit_type)[0]
873                test_kit_instance = test_kit.__class__()
874                test_kit_instance.__check_config__(kit)
875                setattr(test_kit_instance, "device_name", device_name)
876                kit_instances.append(test_kit_instance)
877            else:
878                raise ParamError(ErrorMessage.Common.Code_0101003.format(kit_type))
879        return kit_instances
880