• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import copy
20import os
21import shutil
22import threading
23import time
24from ast import literal_eval
25from concurrent.futures import ThreadPoolExecutor
26from concurrent.futures import wait
27from xml.etree import ElementTree
28
29from _core.constants import ConfigConst
30from _core.constants import FilePermission
31from _core.constants import ModeType
32from _core.constants import ReportConst
33from _core.executor.request import Request
34from _core.logger import platform_logger
35from _core.logger import redirect_driver_log_begin
36from _core.logger import redirect_driver_log_end
37from _core.plugin import Config
38from _core.utils import calculate_elapsed_time
39from _core.utils import get_instance_name
40from _core.utils import check_mode
41from _core.utils import get_file_absolute_path
42from _core.utils import get_kit_instances
43from _core.utils import check_device_name
44from _core.utils import check_device_env_index
45from _core.exception import ParamError
46from _core.exception import ExecuteTerminate
47from _core.exception import DeviceError
48from _core.exception import LiteDeviceError
49from _core.report.reporter_helper import ReportConstant
50from _core.report.reporter_helper import DataHelper
51from _core.report.reporter_helper import Suite
52from _core.report.reporter_helper import Case
53from _core.testkit.json_parser import JsonParser
54
55LOG = platform_logger("Concurrent")
56
57
58class Concurrent:
59    @classmethod
60    def executor_callback(cls, worker):
61        worker_exception = worker.exception()
62        if worker_exception:
63            LOG.error("Worker return exception: {}".format(worker_exception))
64
65    @classmethod
66    def concurrent_execute(cls, func, params_list, max_size=8):
67        """
68        Provider the ability to execute target function concurrently
69        :param func: target function name
70        :param params_list: the list of params in these target functions
71        :param max_size:  the max size of thread  you wanted  in thread pool
72        :return:
73        """
74        with ThreadPoolExecutor(max_size) as executor:
75            future_params = dict()
76            for params in params_list:
77                future = executor.submit(func, *params)
78                future_params.update({future: params})
79                future.add_done_callback(cls.executor_callback)
80            wait(future_params)  # wait all function complete
81            result_list = []
82            for future in future_params:
83                result_list.append((future.result(), future_params[future]))
84            return result_list
85
86
87class DriversThread(threading.Thread):
88    def __init__(self, test_driver, task, environment, message_queue):
89        threading.Thread.__init__(self)
90        self.test_driver = test_driver
91        self.listeners = None
92        self.task = task
93        self.environment = environment
94        self.message_queue = message_queue
95        self.thread_id = None
96        self.error_message = ""
97        self.module_config_kits = None
98        self.start_time = time.time()
99
100    def set_listeners(self, listeners):
101        self.listeners = listeners
102        if self.environment is None:
103            return
104
105        for listener in listeners:
106            listener.device_sn = self.environment.devices[0].device_sn
107
108    def set_thread_id(self, thread_id):
109        self.thread_id = thread_id
110
111    def get_driver_log_file(self, test):
112        log_file = os.path.join(
113            self.task.config.log_path,
114            test.source.module_name, ReportConstant.module_run_log)
115        return log_file
116
117    def run(self):
118        from xdevice import Scheduler
119        driver, test = None, None
120        if self.test_driver and Scheduler.is_execute:
121            driver, test = self.test_driver
122        if driver is None or test is None:
123            return
124        redirect_driver_log_begin(self.ident, self.get_driver_log_file(test))
125        LOG.debug("Thread id: %s start" % self.thread_id)
126        execute_message = ExecuteMessage(
127            '', self.environment, self.test_driver, self.thread_id)
128        try:
129            # construct params
130            driver, test = self.test_driver
131            driver_request = self._get_driver_request(test, execute_message)
132            if driver_request is None:
133                return
134            # setup device
135            self._do_task_setup(driver_request)
136            # driver execute
137            self.reset_device(driver_request.config)
138            driver.__execute__(driver_request)
139        except Exception as exception:
140            error_no = getattr(exception, "error_no", "00000")
141            if self.environment is None:
142                LOG.exception("Exception: %s", exception, exc_info=False,
143                              error_no=error_no)
144            else:
145                LOG.exception(
146                    "Device: %s, exception: %s" % (
147                        self.environment.__get_serial__(), exception),
148                    exc_info=False, error_no=error_no)
149            self.error_message = "{}: {}".format(
150                get_instance_name(exception), str(exception))
151
152        finally:
153            self._do_common_module_kit_teardown()
154            self._handle_finally(driver, test, execute_message)
155        redirect_driver_log_end(self.ident)
156
157    @staticmethod
158    def reset_device(config):
159        if getattr(config, "reboot_per_module", False):
160            for device in config.environment.devices:
161                device.reboot()
162
163    @staticmethod
164    def update_report_xml(result_xml, props):
165        """update devices, start time, end time, etc. to the result file"""
166        if not os.path.exists(result_xml) or not props:
167            return
168        try:
169            root = ElementTree.parse(result_xml).getroot()
170        except ElementTree.ParseError as e:
171            LOG.error(f"parse result xml error! xml file {result_xml}")
172            LOG.error(f"error message: {e}")
173            return
174        for k, v in props.items():
175            if k == ReportConstant.devices:
176                v = literal_eval(str(v))
177            root.set(k, v)
178        result_fd = os.open(result_xml, os.O_CREAT | os.O_WRONLY | os.O_TRUNC, FilePermission.mode_644)
179        with os.fdopen(result_fd, mode="w", encoding="utf-8") as result_file:
180            result_file.write(ElementTree.tostring(root).decode())
181
182    def _handle_finally(self, driver, test, execute_message):
183        from xdevice import Scheduler
184        source_content = test.source.source_file or test.source.source_string
185        end_time = time.time()
186        LOG.info("Executed: %s, Execution Time: %s" % (
187            source_content, calculate_elapsed_time(self.start_time, end_time)))
188
189        # inherit history report under retry mode
190        if driver and test:
191            execute_result = driver.__result__()
192
193            # update result xml
194            update_props = {
195                ReportConstant.start_time: time.strftime(
196                    ReportConstant.time_format, time.localtime(int(self.start_time))),
197                ReportConstant.end_time: time.strftime(
198                    ReportConstant.time_format, time.localtime(int(end_time))),
199                ReportConstant.test_type: test.source.test_type
200            }
201            if self.environment is not None:
202                update_props.update({ReportConstant.devices: self.environment.get_description()})
203            self.update_report_xml(execute_result, update_props)
204            LOG.debug("Execute result: %s" % execute_result)
205            if getattr(self.task.config, "history_report_path", ""):
206                execute_result = self._inherit_execute_result(
207                    execute_result, test)
208            execute_message.set_result(execute_result)
209
210        # set execute state
211        if self.error_message:
212            execute_message.set_state(ExecuteMessage.DEVICE_ERROR)
213        else:
214            execute_message.set_state(ExecuteMessage.DEVICE_FINISH)
215
216        # free environment
217        if self.environment:
218            LOG.debug("Thread %s free environment",
219                      execute_message.get_thread_id())
220            Scheduler.__free_environment__(execute_message.get_environment())
221
222        LOG.debug("Put thread %s result", self.thread_id)
223        self.message_queue.put(execute_message)
224        LOG.info("")
225
226    def _do_common_module_kit_setup(self, driver_request):
227        for device in self.environment.devices:
228            setattr(device, ConfigConst.common_module_kits, [])
229        from xdevice import Scheduler
230        for kit in self.module_config_kits:
231            run_flag = False
232            for device in self.environment.devices:
233                if not Scheduler.is_execute:
234                    raise ExecuteTerminate()
235                if not check_device_env_index(device, kit):
236                    continue
237                if check_device_name(device, kit):
238                    run_flag = True
239                    kit_copy = copy.deepcopy(kit)
240                    module_kits = getattr(device, ConfigConst.common_module_kits)
241                    module_kits.append(kit_copy)
242                    kit_copy.__setup__(device, request=driver_request)
243            if not run_flag:
244                kit_device_name = getattr(kit, "device_name", None)
245                error_msg = "device name '%s' of '%s' not exist" % (
246                    kit_device_name, kit.__class__.__name__)
247                LOG.error(error_msg, error_no="00108")
248                raise ParamError(error_msg, error_no="00108")
249
250    def _do_common_module_kit_teardown(self):
251        try:
252            for device in self.environment.devices:
253                for kit in getattr(device, ConfigConst.common_module_kits, []):
254                    if check_device_name(device, kit, step="teardown"):
255                        kit.__teardown__(device)
256                setattr(device, ConfigConst.common_module_kits, [])
257        except Exception as e:
258            LOG.error("Common module kit teardown error: {}".format(e))
259
260    def _do_task_setup(self, driver_request):
261        if check_mode(ModeType.decc) or getattr(
262                driver_request.config, ConfigConst.check_device, False):
263            return
264
265        if self.environment is None:
266            return
267
268        if hasattr(driver_request.config, ConfigConst.module_config) and \
269                getattr(driver_request.config, ConfigConst.module_config, None):
270            module_config_path = getattr(driver_request.config, ConfigConst.module_config, None)
271            LOG.debug("Common module config path: {}".format(module_config_path))
272            from xdevice import Variables
273            config_path = get_file_absolute_path(module_config_path,
274                                                 [os.path.join(Variables.exec_dir, "config")])
275            json_config = JsonParser(config_path)
276            self.module_config_kits = get_kit_instances(json_config,
277                                                        driver_request.config.resource_path,
278                                                        driver_request.config.testcases_path)
279            self._do_common_module_kit_setup(driver_request)
280
281        from xdevice import Scheduler
282        for device in self.environment.devices:
283            if not getattr(device, ConfigConst.need_kit_setup, True):
284                LOG.debug("Device %s need kit setup is false" % device)
285                continue
286
287            # do task setup for device
288            kits_copy = copy.deepcopy(self.task.config.kits)
289            setattr(device, ConfigConst.task_kits, kits_copy)
290            for kit in getattr(device, ConfigConst.task_kits, []):
291                if not Scheduler.is_execute:
292                    break
293                try:
294                    kit.__setup__(device, request=driver_request)
295                except (ParamError, ExecuteTerminate, DeviceError,
296                        LiteDeviceError, ValueError, TypeError,
297                        SyntaxError, AttributeError) as exception:
298                    error_no = getattr(exception, "error_no", "00000")
299                    LOG.exception(
300                        "Task setup device: %s, exception: %s" % (
301                            self.environment.__get_serial__(),
302                            exception), exc_info=False, error_no=error_no)
303            LOG.debug("Set device %s need kit setup to false" % device)
304            setattr(device, ConfigConst.need_kit_setup, False)
305
306        # set product_info to self.task
307        if getattr(driver_request, ConfigConst.product_info, "") and not \
308                getattr(self.task, ConfigConst.product_info, ""):
309            product_info = getattr(driver_request, ConfigConst.product_info)
310            if not isinstance(product_info, dict):
311                LOG.warning("Product info should be dict, %s",
312                            product_info)
313                return
314            setattr(self.task, ConfigConst.product_info, product_info)
315
316    def _get_driver_request(self, root_desc, execute_message):
317        config = Config()
318        config.update(copy.deepcopy(self.task.config).__dict__)
319        config.environment = self.environment
320        if getattr(config, "history_report_path", ""):
321            # modify config.testargs
322            history_report_path = getattr(config, "history_report_path", "")
323            module_name = root_desc.source.module_name
324            unpassed_test_params = self._get_unpassed_test_params(
325                history_report_path, module_name)
326            if not unpassed_test_params:
327                LOG.info("%s all test cases are passed, no need retry",
328                         module_name)
329                driver_request = Request(self.thread_id, root_desc,
330                                         self.listeners, config)
331                execute_message.set_request(driver_request)
332                return None
333            if unpassed_test_params[0] != module_name and \
334                    unpassed_test_params[0] != str(module_name).split(".")[0]:
335                test_args = getattr(config, "testargs", {})
336                test_params = []
337                for unpassed_test_param in unpassed_test_params:
338                    if unpassed_test_param not in test_params:
339                        test_params.append(unpassed_test_param)
340                test_args["test"] = test_params
341                if "class" in test_args.keys():
342                    test_args.pop("class")
343                setattr(config, "testargs", test_args)
344        if getattr(config, "tf_suite", ""):
345            if root_desc.source.module_name in config.tf_suite.keys():
346                config.tf_suite = config.tf_suite.get(
347                    root_desc.source.module_name)
348            else:
349                config.tf_suite = dict()
350        for listener in self.listeners:
351            LOG.debug("Thread id %s, listener %s" % (self.thread_id, listener))
352        driver_request = Request(self.thread_id, root_desc, self.listeners,
353                                 config)
354        execute_message.set_request(driver_request)
355        return driver_request
356
357    @classmethod
358    def _get_unpassed_test_params(cls, history_report_path, module_name):
359        unpassed_test_params = []
360        from _core.report.result_reporter import ResultReporter
361        params = ResultReporter.get_task_info_params(history_report_path)
362        if not params:
363            return unpassed_test_params
364        failed_list = []
365        try:
366            from devicetest.agent.decc import Handler
367            if Handler.DAV.retry_select:
368                for i in Handler.DAV.case_id_list:
369                    failed_list.append(i + "#" + i)
370            else:
371                failed_list = params[ReportConst.unsuccessful_params].get(module_name, [])
372        except Exception:
373            failed_list = params[ReportConst.unsuccessful_params].get(module_name, [])
374        if not failed_list:
375            failed_list = params[ReportConst.unsuccessful_params].get(str(module_name).split(".")[0], [])
376        unpassed_test_params.extend(failed_list)
377        LOG.debug("Get unpassed test params %s", unpassed_test_params)
378        return unpassed_test_params
379
380    @classmethod
381    def _append_unpassed_test_param(cls, history_report_file,
382                                    unpassed_test_params):
383
384        testsuites_element = DataHelper.parse_data_report(history_report_file)
385        for testsuite_element in testsuites_element:
386            suite_name = testsuite_element.get("name", "")
387            suite = Suite()
388            suite.set_cases(testsuite_element)
389            for case in suite.cases:
390                if case.is_passed():
391                    continue
392                unpassed_test_param = "{}#{}#{}".format(
393                    suite_name, case.classname, case.name)
394                unpassed_test_params.append(unpassed_test_param)
395
396    def _inherit_execute_result(self, execute_result, root_desc):
397        module_name = root_desc.source.module_name
398        execute_result_name = "%s.xml" % module_name
399        history_execute_result = self._get_history_execute_result(
400            execute_result_name)
401        if not history_execute_result:
402            LOG.warning("%s no history execute result exists",
403                        execute_result_name)
404            return execute_result
405
406        if not check_mode(ModeType.decc):
407            if not os.path.exists(execute_result):
408                result_dir = \
409                    os.path.join(self.task.config.report_path, "result")
410                os.makedirs(result_dir, exist_ok=True)
411                target_execute_result = os.path.join(result_dir,
412                                                     execute_result_name)
413                shutil.copyfile(history_execute_result, target_execute_result)
414                LOG.info("Copy %s to %s" % (history_execute_result,
415                                            target_execute_result))
416                return target_execute_result
417
418        real_execute_result = self._get_real_execute_result(execute_result)
419
420        # inherit history execute result
421        testsuites_element = DataHelper.parse_data_report(real_execute_result)
422        if self._is_empty_report(testsuites_element):
423            if check_mode(ModeType.decc):
424                LOG.info("Empty report no need to inherit history execute"
425                         " result")
426            else:
427                LOG.info("Empty report '%s' no need to inherit history execute"
428                         " result", history_execute_result)
429            return execute_result
430
431        real_history_execute_result = self._get_real_history_execute_result(
432            history_execute_result, module_name)
433
434        history_testsuites_element = DataHelper.parse_data_report(
435            real_history_execute_result)
436        if self._is_empty_report(history_testsuites_element):
437            LOG.info("History report '%s' is empty", history_execute_result)
438            return execute_result
439        if check_mode(ModeType.decc):
440            LOG.info("Inherit history execute result")
441        else:
442            LOG.info("Inherit history execute result: %s",
443                     history_execute_result)
444        self._inherit_element(history_testsuites_element, testsuites_element)
445
446        if check_mode(ModeType.decc):
447            from xdevice import SuiteReporter
448            SuiteReporter.append_report_result(
449                (execute_result, DataHelper.to_string(testsuites_element)))
450        else:
451            # generate inherit execute result
452            DataHelper.generate_report(testsuites_element, execute_result)
453        return execute_result
454
455    def _inherit_element(self, history_testsuites_element, testsuites_element):
456        for history_testsuite_element in history_testsuites_element:
457            history_testsuite_name = history_testsuite_element.get("name", "")
458            target_testsuite_element = None
459            for testsuite_element in testsuites_element:
460                if history_testsuite_name == testsuite_element.get("name", ""):
461                    target_testsuite_element = testsuite_element
462                    break
463
464            if target_testsuite_element is None:
465                testsuites_element.append(history_testsuite_element)
466                inherited_test = int(testsuites_element.get(
467                    ReportConstant.tests, 0)) + int(
468                    history_testsuite_element.get(ReportConstant.tests, 0))
469                testsuites_element.set(ReportConstant.tests,
470                                       str(inherited_test))
471                continue
472
473            pass_num = 0
474            for history_testcase_element in history_testsuite_element:
475                if self._check_testcase_pass(history_testcase_element):
476                    target_testsuite_element.append(history_testcase_element)
477                    pass_num += 1
478
479            inherited_test = int(target_testsuite_element.get(
480                ReportConstant.tests, 0)) + pass_num
481            target_testsuite_element.set(ReportConstant.tests,
482                                         str(inherited_test))
483            inherited_test = int(testsuites_element.get(
484                ReportConstant.tests, 0)) + pass_num
485            testsuites_element.set(ReportConstant.tests, str(inherited_test))
486
487    def _get_history_execute_result(self, execute_result_name):
488        if execute_result_name.endswith(".xml"):
489            execute_result_name = execute_result_name[:-4]
490        history_execute_result = \
491            self._get_data_report_from_record(execute_result_name)
492        if history_execute_result:
493            return history_execute_result
494        for root_dir, _, files in os.walk(
495                self.task.config.history_report_path):
496            for result_file in files:
497                if result_file.endswith(execute_result_name):
498                    history_execute_result = os.path.abspath(
499                        os.path.join(root_dir, result_file))
500        return history_execute_result
501
502    @classmethod
503    def _check_testcase_pass(cls, history_testcase_element):
504        case = Case()
505        case.result = history_testcase_element.get(ReportConstant.result, "")
506        case.status = history_testcase_element.get(ReportConstant.status, "")
507        case.message = history_testcase_element.get(ReportConstant.message, "")
508        if len(history_testcase_element) > 0:
509            if not case.result:
510                case.result = ReportConstant.false
511            case.message = history_testcase_element[0].get(
512                ReportConstant.message)
513
514        return case.is_passed()
515
516    @classmethod
517    def _is_empty_report(cls, testsuites_element):
518        if len(testsuites_element) < 1:
519            return True
520        if len(testsuites_element) >= 2:
521            return False
522
523        if int(testsuites_element[0].get(ReportConstant.unavailable, 0)) > 0:
524            return True
525        return False
526
527    def _get_data_report_from_record(self, execute_result_name):
528        history_report_path = \
529            getattr(self.task.config, "history_report_path", "")
530        if history_report_path:
531            from _core.report.result_reporter import ResultReporter
532            params = ResultReporter.get_task_info_params(history_report_path)
533            if params:
534                report_data_dict = dict(params[ReportConst.data_reports])
535                if execute_result_name in report_data_dict.keys():
536                    return report_data_dict.get(execute_result_name)
537                elif execute_result_name.split(".")[0] in \
538                        report_data_dict.keys():
539                    return report_data_dict.get(
540                        execute_result_name.split(".")[0])
541        return ""
542
543    @classmethod
544    def _get_real_execute_result(cls, execute_result):
545        from xdevice import SuiteReporter
546        LOG.debug("Get real execute result length is: %s" %
547                  len(SuiteReporter.get_report_result()))
548        if check_mode(ModeType.decc):
549            for suite_report, report_result in \
550                    SuiteReporter.get_report_result():
551                if os.path.splitext(suite_report)[0] == \
552                        os.path.splitext(execute_result)[0]:
553                    return report_result
554            return ""
555        else:
556            return execute_result
557
558    @classmethod
559    def _get_real_history_execute_result(cls, history_execute_result,
560                                         module_name):
561        from xdevice import SuiteReporter
562        LOG.debug("Get real history execute result: %s" %
563                  SuiteReporter.history_report_result)
564        if check_mode(ModeType.decc):
565            virtual_report_path, report_result = SuiteReporter. \
566                get_history_result_by_module(module_name)
567            return report_result
568        else:
569            return history_execute_result
570
571
572class DriversDryRunThread(threading.Thread):
573    def __init__(self, test_driver, task, environment, message_queue):
574        threading.Thread.__init__(self)
575        self.test_driver = test_driver
576        self.listeners = None
577        self.task = task
578        self.environment = environment
579        self.message_queue = message_queue
580        self.thread_id = None
581        self.error_message = ""
582
583    def set_thread_id(self, thread_id):
584        self.thread_id = thread_id
585
586    def run(self):
587        from xdevice import Scheduler
588        LOG.debug("Thread id: %s start" % self.thread_id)
589        start_time = time.time()
590        execute_message = ExecuteMessage('', self.environment,
591                                         self.test_driver, self.thread_id)
592        driver, test = None, None
593        try:
594            if self.test_driver and Scheduler.is_execute:
595                # construct params
596                driver, test = self.test_driver
597                driver_request = self._get_driver_request(test,
598                                                          execute_message)
599                if driver_request is None:
600                    return
601
602                # setup device
603                self._do_task_setup(driver_request)
604
605                # driver execute
606                self.reset_device(driver_request.config)
607                driver.__dry_run_execute__(driver_request)
608
609        except Exception as exception:
610            error_no = getattr(exception, "error_no", "00000")
611            if self.environment is None:
612                LOG.exception("Exception: %s", exception, exc_info=False,
613                              error_no=error_no)
614            else:
615                LOG.exception(
616                    "Device: %s, exception: %s" % (
617                        self.environment.__get_serial__(), exception),
618                    exc_info=False, error_no=error_no)
619            self.error_message = "{}: {}".format(
620                get_instance_name(exception), str(exception))
621
622        finally:
623            self._handle_finally(driver, execute_message, start_time, test)
624
625    @staticmethod
626    def reset_device(config):
627        if getattr(config, "reboot_per_module", False):
628            for device in config.environment.devices:
629                device.reboot()
630
631    def _handle_finally(self, driver, execute_message, start_time, test):
632        from xdevice import Scheduler
633        source_content = (self.test_driver[1].source.source_file
634                          or self.test_driver[1].source.source_string)
635        LOG.info("Executed: %s, Execution Time: %s" % (
636            source_content, calculate_elapsed_time(start_time, time.time())))
637
638        # set execute state
639        if self.error_message:
640            execute_message.set_state(ExecuteMessage.DEVICE_ERROR)
641        else:
642            execute_message.set_state(ExecuteMessage.DEVICE_FINISH)
643
644        # free environment
645        if self.environment:
646            LOG.debug("Thread %s free environment",
647                      execute_message.get_thread_id())
648            Scheduler.__free_environment__(execute_message.get_environment())
649
650        LOG.debug("Put thread %s result", self.thread_id)
651        self.message_queue.put(execute_message)
652
653    def _do_task_setup(self, driver_request):
654        if check_mode(ModeType.decc) or getattr(
655                driver_request.config, ConfigConst.check_device, False):
656            return
657
658        if self.environment is None:
659            return
660
661        from xdevice import Scheduler
662        for device in self.environment.devices:
663            if not getattr(device, ConfigConst.need_kit_setup, True):
664                LOG.debug("Device %s need kit setup is false" % device)
665                continue
666
667            # do task setup for device
668            kits_copy = copy.deepcopy(self.task.config.kits)
669            setattr(device, ConfigConst.task_kits, kits_copy)
670            for kit in getattr(device, ConfigConst.task_kits, []):
671                if not Scheduler.is_execute:
672                    break
673                try:
674                    kit.__setup__(device, request=driver_request)
675                except (ParamError, ExecuteTerminate, DeviceError,
676                        LiteDeviceError, ValueError, TypeError,
677                        SyntaxError, AttributeError) as exception:
678                    error_no = getattr(exception, "error_no", "00000")
679                    LOG.exception(
680                        "Task setup device: %s, exception: %s" % (
681                            self.environment.__get_serial__(),
682                            exception), exc_info=False, error_no=error_no)
683            LOG.debug("Set device %s need kit setup to false" % device)
684            setattr(device, ConfigConst.need_kit_setup, False)
685
686        # set product_info to self.task
687        if getattr(driver_request, ConfigConst.product_info, "") and not \
688                getattr(self.task, ConfigConst.product_info, ""):
689            product_info = getattr(driver_request, ConfigConst.product_info)
690            if not isinstance(product_info, dict):
691                LOG.warning("Product info should be dict, %s",
692                            product_info)
693                return
694            setattr(self.task, ConfigConst.product_info, product_info)
695
696    def _get_driver_request(self, root_desc, execute_message):
697        config = Config()
698        config.update(copy.deepcopy(self.task.config).__dict__)
699        config.environment = self.environment
700        if self.listeners:
701            for listener in self.listeners:
702                LOG.debug("Thread id %s, listener %s" % (self.thread_id, listener))
703        driver_request = Request(self.thread_id, root_desc, self.listeners,
704                                 config)
705        execute_message.set_request(driver_request)
706        return driver_request
707
708
709class QueueMonitorThread(threading.Thread):
710
711    def __init__(self, message_queue, current_driver_threads, test_drivers):
712        threading.Thread.__init__(self)
713        self.message_queue = message_queue
714        self.current_driver_threads = current_driver_threads
715        self.test_drivers = test_drivers
716
717    def run(self):
718        from xdevice import Scheduler
719        LOG.debug("Queue monitor thread start")
720        while self.test_drivers or self.current_driver_threads:
721            if not self.current_driver_threads:
722                time.sleep(3)
723                continue
724            execute_message = self.message_queue.get()
725
726            self.current_driver_threads.pop(execute_message.get_thread_id())
727
728            if execute_message.get_state() == ExecuteMessage.DEVICE_FINISH:
729                LOG.debug("Thread id: %s execute finished" %
730                          execute_message.get_thread_id())
731            elif execute_message.get_state() == ExecuteMessage.DEVICE_ERROR:
732                LOG.debug("Thread id: %s execute error" %
733                          execute_message.get_thread_id())
734
735            if Scheduler.upload_address:
736                Scheduler.upload_module_result(execute_message)
737
738        LOG.debug("Queue monitor thread end")
739        if not Scheduler.is_execute:
740            LOG.info("Terminate success")
741            Scheduler.terminate_result.put("terminate success")
742
743
744class ExecuteMessage:
745    DEVICE_RUN = 'device_run'
746    DEVICE_FINISH = 'device_finish'
747    DEVICE_ERROR = 'device_error'
748
749    def __init__(self, state, environment, drivers, thread_id):
750        self.state = state
751        self.environment = environment
752        self.drivers = drivers
753        self.thread_id = thread_id
754        self.request = None
755        self.result = None
756
757    def set_state(self, state):
758        self.state = state
759
760    def get_state(self):
761        return self.state
762
763    def set_request(self, request):
764        self.request = request
765
766    def get_request(self):
767        return self.request
768
769    def set_result(self, result):
770        self.result = result
771
772    def get_result(self):
773        return self.result
774
775    def get_environment(self):
776        return self.environment
777
778    def get_thread_id(self):
779        return self.thread_id
780
781    def get_drivers(self):
782        return self.drivers
783