• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import copy
20import datetime
21import os
22import queue
23import time
24import uuid
25import shutil
26import json
27from xml.etree import ElementTree
28
29from _core.utils import unique_id
30from _core.utils import check_mode
31from _core.utils import get_sub_path
32from _core.utils import get_filename_extension
33from _core.utils import convert_serial
34from _core.utils import convert_mac
35from _core.utils import get_instance_name
36from _core.utils import is_config_str
37from _core.utils import check_result_report
38from _core.utils import get_cst_time
39from _core.environment.manager_env import EnvironmentManager
40from _core.environment.manager_env import DeviceSelectionOption
41from _core.exception import ParamError
42from _core.exception import ExecuteTerminate
43from _core.exception import LiteDeviceError
44from _core.exception import DeviceError
45from _core.interface import LifeCycle
46from _core.executor.request import Request
47from _core.executor.request import Descriptor
48from _core.plugin import get_plugin
49from _core.plugin import Plugin
50from _core.plugin import Config
51from _core.report.reporter_helper import ExecInfo
52from _core.report.reporter_helper import ReportConstant
53from _core.report.reporter_helper import Case
54from _core.report.reporter_helper import DataHelper
55from _core.constants import TestExecType
56from _core.constants import CKit
57from _core.constants import ModeType
58from _core.constants import DeviceLabelType
59from _core.constants import SchedulerType
60from _core.constants import ListenerType
61from _core.constants import ConfigConst
62from _core.constants import ReportConst
63from _core.constants import HostDrivenTestType
64from _core.constants import LifeStage
65from _core.executor.concurrent import DriversThread
66from _core.executor.concurrent import QueueMonitorThread
67from _core.executor.concurrent import DriversDryRunThread
68from _core.executor.source import TestSetSource
69from _core.executor.source import find_test_descriptors
70from _core.executor.source import find_testdict_descriptors
71from _core.executor.source import TestDictSource
72from _core.logger import platform_logger
73from _core.logger import add_task_file_handler
74from _core.logger import remove_task_file_handler
75from _core.logger import add_encrypt_file_handler
76from _core.logger import remove_encrypt_file_handler
77
78__all__ = ["Scheduler"]
79LOG = platform_logger("Scheduler")
80
81MAX_VISIBLE_LENGTH = 150
82
83
84@Plugin(type=Plugin.SCHEDULER, id=SchedulerType.scheduler)
85class Scheduler(object):
86    """
87    The Scheduler is the main entry point for client code that wishes to
88    discover and execute tests.
89    """
90    # factory params
91    is_execute = True
92    terminate_result = queue.Queue()
93    upload_address = ""
94    task_type = ""
95    task_name = ""
96    mode = ""
97    proxy = None
98
99    # command_queue to store test commands
100    command_queue = []
101    max_command_num = 50
102    # the number of tests in current task
103    test_number = 0
104    device_labels = []
105    repeat_index = 0
106    auto_retry = -1
107    is_need_auto_retry = False
108
109    queue_monitor_thread = None
110
111    def __discover__(self, args):
112        """Discover task to execute"""
113        from _core.executor.request import Task
114        config = Config()
115        config.update(args)
116        task = Task(drivers=[])
117        task.init(config)
118
119        Scheduler.call_life_stage_action(stage=LifeStage.task_start)
120
121        root_descriptor = self._find_test_root_descriptor(task.config)
122        task.set_root_descriptor(root_descriptor)
123        return task
124
125    def __execute__(self, task):
126        error_message = ""
127        unavailable = 0
128        try:
129            Scheduler.is_execute = True
130            if Scheduler.command_queue:
131                LOG.debug("Run command: {}".format(convert_mac(Scheduler.command_queue[-1])))
132                run_command = Scheduler.command_queue.pop()
133                task_id = str(uuid.uuid1()).split("-")[0]
134                Scheduler.command_queue.append((task_id, run_command,
135                                                task.config.report_path))
136                if len(Scheduler.command_queue) > self.max_command_num:
137                    Scheduler.command_queue.pop(0)
138
139            if getattr(task.config, ConfigConst.test_environment, ""):
140                self._reset_environment(task.config.get(
141                    ConfigConst.test_environment, ""))
142            elif getattr(task.config, ConfigConst.configfile, ""):
143                self._reset_environment(config_file=task.config.get(
144                    ConfigConst.configfile, ""))
145
146            # do with the count of repeat about a task
147            if getattr(task.config, ConfigConst.repeat, 0) > 0:
148                Scheduler.repeat_index = \
149                    getattr(task.config, ConfigConst.repeat)
150                drivers_list = list()
151                for index in range(-1, task.config.repeat):
152                    repeat_list = self.construct_repeat_list(task, index)
153                    if repeat_list:
154                        drivers_list.extend(repeat_list)
155                task.test_drivers = drivers_list
156            else:
157                Scheduler.repeat_index = 0
158
159            self.test_number = len(task.test_drivers)
160            for des in task.root.children:
161                if des.error:
162                    error_message = "{};{}".format(des.error.error_msg, error_message)
163                    unavailable += 1
164            if error_message != "":
165                error_message = "test source '{}' or its config json not exists".format(error_message)
166                LOG.error("Exec task error: {}".format(error_message))
167
168                raise ParamError(error_message)
169
170            if task.config.exectype == TestExecType.device_test:
171                self._device_test_execute(task)
172            elif task.config.exectype == TestExecType.host_test:
173                self._host_test_execute(task)
174            else:
175                LOG.info("Exec type %s is bypassed" % task.config.exectype)
176
177        except (ParamError, ValueError, TypeError, SyntaxError, AttributeError,
178                DeviceError, LiteDeviceError, ExecuteTerminate) as exception:
179            error_no = getattr(exception, "error_no", "")
180            error_message = "%s[%s]" % (str(exception), error_no) \
181                if error_no else str(exception)
182            error_no = error_no if error_no else "00000"
183            LOG.exception(exception, exc_info=False, error_no=error_no)
184        finally:
185            Scheduler.reset_test_dict_source()
186            if getattr(task.config, ConfigConst.test_environment, "") or \
187                    getattr(task.config, ConfigConst.configfile, ""):
188                self._restore_environment()
189
190            Scheduler.call_life_stage_action(stage=LifeStage.task_end,
191                                             task=task, error=error_message,
192                                             unavailable=unavailable)
193            if Scheduler.upload_address:
194                Scheduler.upload_task_result(task, error_message)
195                Scheduler.upload_report_end()
196
197    def _device_test_execute(self, task):
198        used_devices = {}
199        try:
200            self._dynamic_concurrent_execute(task, used_devices)
201        finally:
202            Scheduler.__reset_environment__(used_devices)
203            # generate reports
204            self._generate_task_report(task, used_devices)
205
206    def _host_test_execute(self, task):
207        """Execute host test"""
208        try:
209            # initial params
210            current_driver_threads = {}
211            test_drivers = task.test_drivers
212            message_queue = queue.Queue()
213
214            # execute test drivers
215            self.queue_monitor_thread = self._start_queue_monitor(
216                message_queue, test_drivers, current_driver_threads)
217            while test_drivers:
218                if len(current_driver_threads) > 5:
219                    time.sleep(3)
220                    continue
221
222                # clear remaining test drivers when scheduler is terminated
223                if not Scheduler.is_execute:
224                    LOG.info("Clear test drivers")
225                    self._clear_not_executed(task, test_drivers)
226                    break
227
228                # get test driver and device
229                test_driver = test_drivers[0]
230
231                # display executing progress
232                self._display_executing_process(None, test_driver,
233                                                test_drivers)
234
235                # start driver thread
236                self._start_driver_thread(current_driver_threads, (
237                    None, message_queue, task, test_driver))
238                test_drivers.pop(0)
239
240            # wait for all drivers threads finished and do kit teardown
241            while True:
242                if not self.queue_monitor_thread.is_alive():
243                    break
244                time.sleep(3)
245
246        finally:
247            # generate reports
248            self._generate_task_report(task)
249
250    def _dry_run_device_test_execute(self, task):
251        try:
252            # initial params
253            used_devices = {}
254            current_driver_threads = {}
255            test_drivers = task.test_drivers
256            message_queue = queue.Queue()
257            task_unused_env = []
258
259            # execute test drivers
260            self.queue_monitor_thread = self._start_queue_monitor(
261                message_queue, test_drivers, current_driver_threads)
262            while test_drivers:
263                # clear remaining test drivers when scheduler is terminated
264                if not Scheduler.is_execute:
265                    LOG.info("Clear test drivers")
266                    self._clear_not_executed(task, test_drivers)
267                    break
268
269                # get test driver and device
270                test_driver = test_drivers[0]
271                # get environment
272                try:
273                    environment = self.__allocate_environment__(
274                        task.config.__dict__, test_driver)
275                except DeviceError as exception:
276                    self._handle_device_error(exception, task, test_drivers)
277                    continue
278
279                if not Scheduler.is_execute:
280                    if environment:
281                        Scheduler.__free_environment__(environment)
282                    continue
283
284                # start driver thread
285                thread_id = self._get_thread_id(current_driver_threads)
286                driver_thread = DriversDryRunThread(test_driver, task,
287                                                    environment,
288                                                    message_queue)
289                driver_thread.setDaemon(True)
290                driver_thread.set_thread_id(thread_id)
291                driver_thread.start()
292                current_driver_threads.setdefault(thread_id, driver_thread)
293
294                test_drivers.pop(0)
295
296            # wait for all drivers threads finished and do kit teardown
297            while True:
298                if not self.queue_monitor_thread.is_alive():
299                    break
300                time.sleep(3)
301
302            self._do_taskkit_teardown(used_devices, task_unused_env)
303        finally:
304            LOG.debug(
305                "Removing report_path: {}".format(task.config.report_path))
306            # delete reports
307            self.stop_task_logcat()
308            self.stop_encrypt_log()
309            shutil.rmtree(task.config.report_path)
310
311    def _generate_task_report(self, task, used_devices=None):
312        task_info = ExecInfo()
313        test_type = getattr(task.config, "testtype", [])
314        task_name = getattr(task.config, "task", "")
315        if task_name:
316            task_info.test_type = str(task_name).upper()
317        else:
318            task_info.test_type = ",".join(test_type) if test_type else "Test"
319        if used_devices:
320            serials = []
321            platforms = []
322            test_labels = []
323            for serial, device in used_devices.items():
324                serials.append(convert_serial(serial))
325                platform = str(device.label).capitalize()
326                test_label = str(device.label).capitalize()
327                if platform not in platforms:
328                    platforms.append(platform)
329                if test_label not in test_labels:
330                    test_labels.append(test_label)
331            task_info.device_name = ",".join(serials)
332            task_info.platform = ",".join(platforms)
333            task_info.device_label = ",".join(test_labels)
334        else:
335            task_info.device_name = "None"
336            task_info.platform = "None"
337            task_info.device_label = "None"
338        task_info.test_time = task.config.start_time
339        task_info.product_info = getattr(task, "product_info", "")
340
341        listeners = self._create_listeners(task)
342        for listener in listeners:
343            listener.__ended__(LifeCycle.TestTask, task_info,
344                               test_type=task_info.test_type, task=task)
345
346    @classmethod
347    def _create_listeners(cls, task):
348        listeners = []
349        # append log listeners
350        log_listeners = get_plugin(Plugin.LISTENER, ListenerType.log)
351        for log_listener in log_listeners:
352            log_listener_instance = log_listener.__class__()
353            listeners.append(log_listener_instance)
354        # append report listeners
355        report_listeners = get_plugin(Plugin.LISTENER, ListenerType.report)
356        for report_listener in report_listeners:
357            report_listener_instance = report_listener.__class__()
358            setattr(report_listener_instance, "report_path",
359                    task.config.report_path)
360            listeners.append(report_listener_instance)
361        # append upload listeners
362        upload_listeners = get_plugin(Plugin.LISTENER, ListenerType.upload)
363        for upload_listener in upload_listeners:
364            upload_listener_instance = upload_listener.__class__()
365            listeners.append(upload_listener_instance)
366        return listeners
367
368    @staticmethod
369    def _find_device_options(environment_config, options, test_source):
370        devices_option = []
371        index = 1
372        for device_dict in environment_config:
373            label = device_dict.get("label", "")
374            required_manager = device_dict.get("type", "device")
375            required_manager = \
376                required_manager if required_manager else "device"
377            if not label:
378                continue
379            device_option = DeviceSelectionOption(options, label, test_source)
380            device_dict.pop("type", None)
381            device_dict.pop("label", None)
382            device_option.required_manager = required_manager
383            device_option.extend_value = device_dict
384            device_option.source_file = \
385                test_source.config_file or test_source.source_string
386            if hasattr(device_option, "env_index"):
387                device_option.env_index = index
388            index += 1
389            devices_option.append(device_option)
390        return devices_option
391
392    def __allocate_environment__(self, options, test_driver):
393        device_options = self.get_device_options(options,
394                                                 test_driver[1].source)
395        environment = None
396        env_manager = EnvironmentManager()
397        while True:
398            if not Scheduler.is_execute:
399                break
400            if self.queue_monitor_thread and \
401                    not self.queue_monitor_thread.is_alive():
402                LOG.error("Queue monitor thread is dead.")
403                break
404            environment = env_manager.apply_environment(device_options)
405            if len(environment.devices) == len(device_options):
406                return environment
407            else:
408                env_manager.release_environment(environment)
409                LOG.debug("'%s' is waiting available device",
410                          test_driver[1].source.test_name)
411                if env_manager.check_device_exist(device_options):
412                    continue
413                else:
414                    LOG.debug("'%s' required %s devices, actually %s devices"
415                              " were found" % (test_driver[1].source.test_name,
416                                               len(device_options),
417                                               len(environment.devices)))
418                    raise DeviceError("The '%s' required device does not exist"
419                                      % test_driver[1].source.source_file,
420                                      error_no="00104")
421
422        return environment
423
424    @classmethod
425    def get_device_options(cls, options, test_source):
426        device_options = []
427        config_file = test_source.config_file
428        environment_config = []
429        from _core.testkit.json_parser import JsonParser
430        if test_source.source_string and is_config_str(
431                test_source.source_string):
432            json_config = JsonParser(test_source.source_string)
433            environment_config = json_config.get_environment()
434            device_options = cls._find_device_options(
435                environment_config, options, test_source)
436        elif config_file and os.path.exists(config_file):
437            json_config = JsonParser(test_source.config_file)
438            environment_config = json_config.get_environment()
439            device_options = cls._find_device_options(
440                environment_config, options, test_source)
441
442        device_options = cls._calculate_device_options(
443            device_options, environment_config, options, test_source)
444
445        if ConfigConst.component_mapper in options.keys():
446            required_component = options.get(ConfigConst.component_mapper). \
447                get(test_source.module_name, None)
448            for device_option in device_options:
449                device_option.required_component = required_component
450        return device_options
451
452    @staticmethod
453    def __free_environment__(environment):
454        env_manager = EnvironmentManager()
455        env_manager.release_environment(environment)
456
457    @staticmethod
458    def __reset_environment__(used_devices):
459        env_manager = EnvironmentManager()
460        env_manager.reset_environment(used_devices)
461
462    @classmethod
463    def _check_device_spt(cls, kit, driver_request, device):
464        kit_spt = cls._parse_property_value(ConfigConst.spt,
465                                            driver_request, kit)
466        if not kit_spt:
467            setattr(device, ConfigConst.task_state, False)
468            LOG.error("Spt is empty", error_no="00108")
469            return
470        if getattr(driver_request, ConfigConst.product_info, ""):
471            product_info = getattr(driver_request,
472                                   ConfigConst.product_info)
473            if not isinstance(product_info, dict):
474                LOG.warning("Product info should be dict, %s",
475                            product_info)
476                setattr(device, ConfigConst.task_state, False)
477                return
478            device_spt = product_info.get("Security Patch", None)
479            if not device_spt or not \
480                    Scheduler.compare_spt_time(kit_spt, device_spt):
481                LOG.error("The device %s spt is %s, "
482                          "and the test case spt is %s, "
483                          "which does not meet the requirements" %
484                          (device.device_sn, device_spt, kit_spt),
485                          error_no="00116")
486                setattr(device, ConfigConst.task_state, False)
487                return
488
489    def _decc_task_setup(self, environment, task):
490        config = Config()
491        config.update(task.config.__dict__)
492        config.environment = environment
493        driver_request = Request(config=config)
494
495        if environment is None:
496            return False
497
498        for device in environment.devices:
499            if not getattr(device, ConfigConst.need_kit_setup, True):
500                LOG.debug("Device %s need kit setup is false" % device)
501                continue
502
503            # do task setup for device
504            kits_copy = copy.deepcopy(task.config.kits)
505            setattr(device, ConfigConst.task_kits, kits_copy)
506            for kit in getattr(device, ConfigConst.task_kits, []):
507                if not Scheduler.is_execute:
508                    break
509                try:
510                    kit.__setup__(device, request=driver_request)
511                except (ParamError, ExecuteTerminate, DeviceError,
512                        LiteDeviceError, ValueError, TypeError,
513                        SyntaxError, AttributeError) as exception:
514                    error_no = getattr(exception, "error_no", "00000")
515                    LOG.exception(
516                        "Task setup device: %s, exception: %s" % (
517                            environment.__get_serial__(),
518                            exception), exc_info=False, error_no=error_no)
519                if kit.__class__.__name__ == CKit.query and \
520                        device.label in [DeviceLabelType.ipcamera]:
521                    self._check_device_spt(kit, driver_request, device)
522            LOG.debug("Set device %s need kit setup to false" % device)
523            setattr(device, ConfigConst.need_kit_setup, False)
524
525        for device in environment.devices:
526            if not getattr(device, ConfigConst.task_state, True):
527                return False
528
529        # set product_info to self.task
530        if getattr(driver_request, ConfigConst.product_info, "") and \
531                not getattr(task, ConfigConst.product_info, ""):
532            product_info = getattr(driver_request, ConfigConst.product_info)
533            if not isinstance(product_info, dict):
534                LOG.warning("Product info should be dict, %s",
535                            product_info)
536            else:
537                setattr(task, ConfigConst.product_info, product_info)
538        return True
539
540    def _dynamic_concurrent_execute(self, task, used_devices):
541        # initial params
542        current_driver_threads = {}
543        test_drivers = task.test_drivers
544        message_queue = queue.Queue()
545        task_unused_env = []
546
547        # execute test drivers
548        self.queue_monitor_thread = self._start_queue_monitor(
549            message_queue, test_drivers, current_driver_threads)
550        while test_drivers:
551            # clear remaining test drivers when scheduler is terminated
552            if not Scheduler.is_execute:
553                LOG.info("Clear test drivers")
554                self._clear_not_executed(task, test_drivers)
555                break
556
557            # get test driver and device
558            test_driver = test_drivers[0]
559
560            # call life stage
561            Scheduler.call_life_stage_action(stage=LifeStage.case_start,
562                                             case_name=test_driver[1].source.module_name)
563
564            if getattr(task.config, ConfigConst.history_report_path, ""):
565                module_name = test_driver[1].source.module_name
566                if not self.is_module_need_retry(task, module_name):
567                    self._display_executing_process(None, test_driver,
568                                                    test_drivers)
569                    LOG.info("%s are passed, no need to retry" % module_name)
570                    self._append_history_result(task, module_name)
571                    LOG.info("")
572                    test_drivers.pop(0)
573                    continue
574
575            if getattr(task.config, ConfigConst.component_mapper, ""):
576                module_name = test_driver[1].source.module_name
577                self.component_task_setup(task, module_name)
578
579            # get environment
580            try:
581                environment = self.__allocate_environment__(
582                    task.config.__dict__, test_driver)
583            except DeviceError as exception:
584                self._handle_device_error(exception, task, test_drivers)
585                Scheduler.call_life_stage_action(stage=LifeStage.case_end,
586                                                 case_name=test_driver[1].source.module_name,
587                                                 case_result="Failed",
588                                                 error_msg=exception.args)
589                continue
590
591            if not self.queue_monitor_thread.is_alive():
592                LOG.debug("Restart queue monitor thread.")
593                current_driver_threads = {}
594                message_queue = queue.Queue()
595                self.queue_monitor_thread = self._start_queue_monitor(
596                    message_queue, test_drivers, current_driver_threads)
597                continue
598
599            if not Scheduler.is_execute:
600                if environment:
601                    Scheduler.__free_environment__(environment)
602                continue
603
604            if check_mode(ModeType.decc) or getattr(
605                    task.config, ConfigConst.check_device, False):
606                LOG.info("Start to check environment: %s" %
607                         environment.__get_serial__())
608                status = self._decc_task_setup(environment, task)
609                if not status:
610                    Scheduler.__free_environment__(environment)
611                    task_unused_env.append(environment)
612                    error_message = "Load Error[00116]"
613                    self.report_not_executed(task.config.report_path,
614                                             [test_drivers[0]],
615                                             error_message, task)
616                    test_drivers.pop(0)
617                    continue
618                else:
619                    LOG.info("Environment %s check success",
620                             environment.__get_serial__())
621
622            # display executing progress
623            self._display_executing_process(environment, test_driver,
624                                            test_drivers)
625
626            # add to used devices and set need_kit_setup attribute
627            self._append_used_devices(environment, used_devices)
628
629            # start driver thread
630            self._start_driver_thread(current_driver_threads, (
631                environment, message_queue, task, test_driver))
632            test_drivers.pop(0)
633
634        # wait for all drivers threads finished and do kit teardown
635        while True:
636            if not self.queue_monitor_thread.is_alive():
637                break
638            time.sleep(3)
639
640        self._do_taskkit_teardown(used_devices, task_unused_env)
641
642    @classmethod
643    def _append_history_result(cls, task, module_name):
644        history_report_path = getattr(
645            task.config, ConfigConst.history_report_path, "")
646        from _core.report.result_reporter import ResultReporter
647        params = ResultReporter.get_task_info_params(
648            history_report_path)
649
650        if not params or not params[ReportConst.data_reports]:
651            LOG.debug("Task info record data reports is empty")
652            return
653
654        report_data_dict = dict(params[ReportConst.data_reports])
655        if module_name not in report_data_dict.keys():
656            module_name_ = str(module_name).split(".")[0]
657            if module_name_ not in report_data_dict.keys():
658                LOG.error("%s not in data reports" % module_name)
659                return
660            module_name = module_name_
661
662        from xdevice import SuiteReporter
663        if check_mode(ModeType.decc):
664            virtual_report_path, report_result = SuiteReporter. \
665                get_history_result_by_module(module_name)
666            LOG.debug("Append history result: (%s, %s)" % (
667                virtual_report_path, report_result))
668            SuiteReporter.append_report_result(
669                (virtual_report_path, report_result))
670        else:
671            history_execute_result = report_data_dict.get(module_name, "")
672            LOG.info("Start copy %s" % history_execute_result)
673            file_name = get_filename_extension(history_execute_result)[0]
674            if os.path.exists(history_execute_result):
675                result_dir = \
676                    os.path.join(task.config.report_path, "result")
677                os.makedirs(result_dir, exist_ok=True)
678                target_execute_result = "%s.xml" % os.path.join(
679                    task.config.report_path, "result", file_name)
680                shutil.copyfile(history_execute_result, target_execute_result)
681                LOG.info("Copy %s to %s" % (
682                    history_execute_result, target_execute_result))
683            else:
684                error_msg = "Copy failed! %s not exists!" % \
685                            history_execute_result
686                raise ParamError(error_msg)
687
688    def _handle_device_error(self, exception, task, test_drivers):
689        self._display_executing_process(None, test_drivers[0], test_drivers)
690        error_message = "%s: %s" % \
691                        (get_instance_name(exception), exception)
692        LOG.exception(error_message, exc_info=False,
693                      error_no=exception.error_no)
694        if check_mode(ModeType.decc):
695            error_message = "Load Error[00104]"
696        self.report_not_executed(task.config.report_path, [test_drivers[0]],
697                                 error_message, task)
698
699        LOG.info("")
700        test_drivers.pop(0)
701
702    @classmethod
703    def _clear_not_executed(cls, task, test_drivers):
704        if Scheduler.mode != ModeType.decc:
705            # clear all
706            test_drivers.clear()
707            return
708        # The result is reported only in DECC mode, and also clear all.
709        LOG.error("Case no run: task execution terminated!", error_no="00300")
710        error_message = "Execute Terminate[00300]"
711        cls.report_not_executed(task.config.report_path, test_drivers,
712                                error_message)
713        test_drivers.clear()
714
715    @classmethod
716    def report_not_executed(cls, report_path, test_drivers, error_message,
717                            task=None):
718        # traversing list to get remained elements
719        for test_driver in test_drivers:
720            # get report file
721            if task and getattr(task.config, "testdict", ""):
722                report_file = os.path.join(get_sub_path(
723                    test_driver[1].source.source_file),
724                    "%s.xml" % test_driver[1].source.test_name)
725            else:
726                report_file = os.path.join(
727                    report_path, "result",
728                    "%s.xml" % test_driver[1].source.module_name)
729
730            # get report name
731            report_name = test_driver[1].source.test_name if \
732                not test_driver[1].source.test_name.startswith("{") \
733                else "report"
734
735            # get module name
736            module_name = test_driver[1].source.module_name
737
738            # here, normally create empty report and then upload result
739            check_result_report(report_path, report_file, error_message,
740                                report_name, module_name)
741
742    def _start_driver_thread(self, current_driver_threads, thread_params):
743        environment, message_queue, task, test_driver = thread_params
744        thread_id = self._get_thread_id(current_driver_threads)
745        driver_thread = DriversThread(test_driver, task, environment,
746                                      message_queue)
747        driver_thread.setDaemon(True)
748        driver_thread.set_thread_id(thread_id)
749        driver_thread.set_listeners(self._create_listeners(task))
750        driver_thread.start()
751        current_driver_threads.setdefault(thread_id, driver_thread)
752        LOG.info(f"Driver executing in thread {driver_thread.ident}")
753        LOG.info(f"Thread id: {thread_id} execute started")
754
755    @classmethod
756    def _do_taskkit_teardown(cls, used_devices, task_unused_env):
757        for device in used_devices.values():
758            if getattr(device, ConfigConst.need_kit_setup, True):
759                continue
760
761            for kit in getattr(device, ConfigConst.task_kits, []):
762                try:
763                    kit.__teardown__(device)
764                except Exception as error:
765                    LOG.debug("Do task kit teardown: %s" % error)
766            setattr(device, ConfigConst.task_kits, [])
767            setattr(device, ConfigConst.need_kit_setup, True)
768
769        for environment in task_unused_env:
770            for device in environment.devices:
771                setattr(device, ConfigConst.task_state, True)
772                setattr(device, ConfigConst.need_kit_setup, True)
773
774    def _display_executing_process(self, environment, test_driver,
775                                   test_drivers):
776        source_content = test_driver[1].source.source_file or \
777                         test_driver[1].source.source_string
778        if environment is None:
779            LOG.info("[%d / %d] Executing: %s, Driver: %s" %
780                     (self.test_number - len(test_drivers) + 1,
781                      self.test_number, source_content,
782                      test_driver[1].source.test_type))
783            return
784
785        LOG.info("[%d / %d] Executing: %s, Device: %s, Driver: %s" %
786                 (self.test_number - len(test_drivers) + 1,
787                  self.test_number, source_content,
788                  environment.__get_serial__(),
789                  test_driver[1].source.test_type))
790
791    @classmethod
792    def _get_thread_id(cls, current_driver_threads):
793        thread_id = get_cst_time().strftime('%Y-%m-%d-%H-%M-%S-%f')
794        while thread_id in current_driver_threads.keys():
795            thread_id = get_cst_time().strftime('%Y-%m-%d-%H-%M-%S-%f')
796        return thread_id
797
798    @classmethod
799    def _append_used_devices(cls, environment, used_devices):
800        if environment is not None:
801            for device in environment.devices:
802                device_serial = device.__get_serial__() if device else "None"
803                if device_serial and device_serial not in used_devices.keys():
804                    used_devices[device_serial] = device
805
806    @staticmethod
807    def _start_queue_monitor(message_queue, test_drivers,
808                             current_driver_threads):
809        queue_monitor_thread = QueueMonitorThread(message_queue,
810                                                  current_driver_threads,
811                                                  test_drivers)
812        queue_monitor_thread.setDaemon(True)
813        queue_monitor_thread.start()
814        return queue_monitor_thread
815
816    def exec_command(self, command, options):
817        """
818        Directly executes a command without adding it to the command queue.
819        """
820        if command != "run":
821            raise ParamError("unsupported command action: %s" % command,
822                             error_no="00100")
823        exec_type = options.exectype
824        if exec_type in [TestExecType.device_test, TestExecType.host_test,
825                         TestExecType.host_driven_test]:
826            self._exec_task(options)
827        else:
828            LOG.error("Unsupported execution type '%s'" % exec_type,
829                      error_no="00100")
830
831        return
832
833    def _exec_task(self, options):
834        """
835        Directly allocates a device and execute a device test.
836        """
837        try:
838            self.check_auto_retry(options)
839            task = self.__discover__(options.__dict__)
840            self.__execute__(task)
841        except (ParamError, ValueError, TypeError, SyntaxError,
842                AttributeError) as exception:
843            error_no = getattr(exception, "error_no", "00000")
844            LOG.exception("%s: %s" % (get_instance_name(exception), exception),
845                          exc_info=False, error_no=error_no)
846            if Scheduler.upload_address:
847                Scheduler.upload_unavailable_result(str(exception.args))
848                Scheduler.upload_report_end()
849        finally:
850            self.stop_task_logcat()
851            self.stop_encrypt_log()
852            self.start_auto_retry()
853
854    @classmethod
855    def _reset_environment(cls, environment="", config_file=""):
856        env_manager = EnvironmentManager()
857        env_manager.env_stop()
858        EnvironmentManager(environment, config_file)
859
860    @classmethod
861    def _restore_environment(cls):
862        env_manager = EnvironmentManager()
863        env_manager.env_stop()
864        EnvironmentManager()
865
866    @classmethod
867    def start_task_log(cls, log_path):
868        tool_file_name = "task_log.log"
869        tool_log_file = os.path.join(log_path, tool_file_name)
870        add_task_file_handler(tool_log_file)
871
872    @classmethod
873    def start_encrypt_log(cls, log_path):
874        from _core.report.encrypt import check_pub_key_exist
875        if check_pub_key_exist():
876            encrypt_file_name = "task_log.ept"
877            encrypt_log_file = os.path.join(log_path, encrypt_file_name)
878            add_encrypt_file_handler(encrypt_log_file)
879
880    @classmethod
881    def stop_task_logcat(cls):
882        remove_task_file_handler()
883
884    @classmethod
885    def stop_encrypt_log(cls):
886        remove_encrypt_file_handler()
887
888    @staticmethod
889    def _find_test_root_descriptor(config):
890        if getattr(config, ConfigConst.task, None) or \
891                getattr(config, ConfigConst.testargs, None):
892            Scheduler._pre_component_test(config)
893
894        if getattr(config, ConfigConst.subsystems, "") or \
895                getattr(config, ConfigConst.parts, "") or \
896                getattr(config, ConfigConst.component_base_kit, ""):
897            uid = unique_id("Scheduler", "component")
898            if config.subsystems or config.parts:
899                test_set = (config.subsystems, config.parts)
900            else:
901                kit = getattr(config, ConfigConst.component_base_kit)
902                test_set = kit.get_white_list()
903
904            root = Descriptor(uuid=uid, name="component",
905                              source=TestSetSource(test_set),
906                              con=True)
907
908            root.children = find_test_descriptors(config)
909            return root
910            # read test list from testdict
911        if getattr(config, ConfigConst.testdict, "") != "" and getattr(
912                config, ConfigConst.testfile, "") == "":
913            uid = unique_id("Scheduler", "testdict")
914            root = Descriptor(uuid=uid, name="testdict",
915                              source=TestSetSource(config.testdict),
916                              con=True)
917            root.children = find_testdict_descriptors(config)
918            return root
919
920            # read test list from testfile, testlist or task
921        test_set = getattr(config, ConfigConst.testfile, "") or getattr(
922            config, ConfigConst.testlist, "") or getattr(
923            config, ConfigConst.task, "") or getattr(
924            config, ConfigConst.testcase)
925        if test_set:
926            fname, _ = get_filename_extension(test_set)
927            uid = unique_id("Scheduler", fname)
928            root = Descriptor(uuid=uid, name=fname,
929                              source=TestSetSource(test_set), con=True)
930            root.children = find_test_descriptors(config)
931            return root
932        else:
933            raise ParamError("no test file, list, dict, case or task found",
934                             error_no="00102")
935
936    @classmethod
937    def terminate_cmd_exec(cls):
938        Scheduler.is_execute = False
939        Scheduler.repeat_index = 0
940        Scheduler.auto_retry = -1
941        LOG.info("Start to terminate execution")
942        return Scheduler.terminate_result.get()
943
944    @classmethod
945    def upload_case_result(cls, upload_param):
946        if not Scheduler.upload_address:
947            return
948        case_id, result, error, start_time, end_time, report_path = \
949            upload_param
950        if error and len(error) > MAX_VISIBLE_LENGTH:
951            error = "%s..." % error[:MAX_VISIBLE_LENGTH]
952        LOG.info(
953            "Get upload params: %s, %s, %s, %s, %s, %s" % (
954                case_id, result, error, start_time, end_time, report_path))
955        if Scheduler.proxy is not None:
956            Scheduler.proxy.upload_result(case_id, result, error, start_time,
957                                          end_time, report_path)
958        else:
959            LOG.debug("There is no proxy, can't upload case result")
960
961    @classmethod
962    def upload_module_result(cls, exec_message):
963        if not Scheduler.is_execute:
964            return
965        result_file = exec_message.get_result()
966        request = exec_message.get_request()
967        test_name = request.root.source.test_name
968        if not result_file or not os.path.exists(result_file):
969            LOG.error("%s result not exists", test_name, error_no="00200")
970            return
971
972        test_type = request.root.source.test_type
973        LOG.info("Need upload result: %s, test type: %s" %
974                 (result_file, test_type))
975        upload_params, _, _ = cls._get_upload_params(result_file, request)
976        if not upload_params:
977            LOG.error("%s no test case result to upload" % result_file,
978                      error_no="00201")
979            return
980        LOG.info("Need upload %s case" % len(upload_params))
981        upload_suite = []
982        for upload_param in upload_params:
983            case_id, result, error, start_time, end_time, report_path = \
984                upload_param
985            case = {"caseid": case_id, "result": result, "error": error,
986                    "start": start_time, "end": end_time,
987                    "report": report_path}
988            LOG.info("Case info: %s", case)
989            upload_suite.append(case)
990        if Scheduler.proxy is not None:
991            Scheduler.proxy.upload_batch(upload_suite)
992        else:
993            LOG.debug("There is no proxy, can't upload module result")
994
995    @classmethod
996    def _get_upload_params(cls, result_file, request):
997        upload_params = []
998        report_path = result_file
999        testsuites_element = DataHelper.parse_data_report(report_path)
1000        start_time, end_time = cls._get_time(testsuites_element)
1001        test_type = request.get_test_type()
1002        if test_type == HostDrivenTestType.device_test or test_type == HostDrivenTestType.windows_test:
1003            for model_element in testsuites_element:
1004                case_id = model_element.get(ReportConstant.name, "")
1005                case_result, error = cls.get_script_result(model_element)
1006                if error and len(error) > MAX_VISIBLE_LENGTH:
1007                    error = "{}...".format(error[:MAX_VISIBLE_LENGTH])
1008                report = cls._get_report_path(
1009                    request.config.report_path,
1010                    model_element.get(ReportConstant.report, ""))
1011                upload_params.append(
1012                    (case_id, case_result, error, start_time, end_time, report,))
1013        else:
1014            for testsuite_element in testsuites_element:
1015                if check_mode(ModeType.developer):
1016                    module_name = str(get_filename_extension(
1017                        report_path)[0]).split(".")[0]
1018                else:
1019                    module_name = testsuite_element.get(ReportConstant.name,
1020                                                        "none")
1021                for case_element in testsuite_element:
1022                    case_id = cls._get_case_id(case_element, module_name)
1023                    case_result, error = cls._get_case_result(case_element)
1024                    if case_result == "Ignored":
1025                        LOG.info(
1026                            "Get upload params: {} result is ignored".format(case_id))
1027                        continue
1028                    if error and len(error) > MAX_VISIBLE_LENGTH:
1029                        error = "{}...".format(error[:MAX_VISIBLE_LENGTH])
1030                    report = cls._get_report_path(
1031                        request.config.report_path,
1032                        case_element.get(ReportConstant.report, ""))
1033                    upload_params.append(
1034                        (case_id, case_result, error, start_time, end_time, report,))
1035        return upload_params, start_time, end_time
1036
1037    @classmethod
1038    def get_script_result(cls, model_element):
1039        disabled = int(model_element.get(ReportConstant.disabled)) if \
1040            model_element.get(ReportConstant.disabled, "") else 0
1041        failures = int(model_element.get(ReportConstant.failures)) if \
1042            model_element.get(ReportConstant.failures, "") else 0
1043        errors = int(model_element.get(ReportConstant.errors)) if \
1044            model_element.get(ReportConstant.errors, "") else 0
1045        unavailable = int(model_element.get(ReportConstant.unavailable)) if \
1046            model_element.get(ReportConstant.unavailable, "") else 0
1047        if failures > 0 or errors > 0:
1048            result = "Failed"
1049        elif disabled > 0 or unavailable > 0:
1050            result = "Unavailable"
1051        else:
1052            result = "Passed"
1053
1054        if result == "Passed":
1055            return result, ""
1056        if Scheduler.mode == ModeType.decc:
1057            result = "Failed"
1058        result_kind = model_element.get(ReportConstant.result_kind, "")
1059        if result_kind:
1060            result = result_kind
1061
1062        error_msg = model_element.get(ReportConstant.message, "")
1063        if not error_msg and len(model_element) > 0:
1064            error_msg = model_element[0].get(ReportConstant.message, "")
1065            if not error_msg and len(model_element[0]) > 0:
1066                error_msg = model_element[0][0].get(ReportConstant.message, "")
1067        return result, error_msg
1068
1069    @classmethod
1070    def _get_case_id(cls, case_element, package_name):
1071        class_name = case_element.get(ReportConstant.class_name, "none")
1072        method_name = case_element.get(ReportConstant.name, "none")
1073        case_id = "{}#{}#{}#{}".format(Scheduler.task_name, package_name,
1074                                       class_name, method_name)
1075        return case_id
1076
1077    @classmethod
1078    def _get_case_result(cls, case_element):
1079        # get result
1080        case = Case()
1081        case.status = case_element.get(ReportConstant.status, "")
1082        case.result = case_element.get(ReportConstant.result, "")
1083        if case_element.get(ReportConstant.message, ""):
1084            case.message = case_element.get(ReportConstant.message)
1085        if len(case_element) > 0:
1086            if not case.result:
1087                case.result = ReportConstant.false
1088            case.message = case_element[0].get(ReportConstant.message)
1089        if case.is_passed():
1090            result = "Passed"
1091        elif case.is_failed():
1092            result = "Failed"
1093        elif case.is_blocked():
1094            result = "Blocked"
1095        elif case.is_ignored():
1096            result = "Ignored"
1097        elif case.is_completed():
1098            if case.message:
1099                result = "Failed"
1100            else:
1101                result = "Passed"
1102        else:
1103            result = "Unavailable"
1104        return result, case.message
1105
1106    @classmethod
1107    def _get_time(cls, testsuite_element):
1108        start_time = testsuite_element.get(ReportConstant.start_time, "")
1109        end_time = testsuite_element.get(ReportConstant.end_time, "")
1110        try:
1111            if start_time and end_time:
1112                start_time = int(time.mktime(time.strptime(
1113                    start_time, ReportConstant.time_format)) * 1000)
1114                end_time = int(time.mktime(time.strptime(
1115                    end_time, ReportConstant.time_format)) * 1000)
1116            else:
1117                timestamp = str(testsuite_element.get(
1118                    ReportConstant.time_stamp, "")).replace("T", " ")
1119                cost_time = testsuite_element.get(ReportConstant.time, "")
1120                if timestamp and cost_time:
1121                    try:
1122                        end_time = int(time.mktime(time.strptime(
1123                            timestamp, ReportConstant.time_format)) * 1000)
1124                    except ArithmeticError as error:
1125                        LOG.error("Get time error {}".format(error))
1126                        end_time = int(time.time() * 1000)
1127                    except ValueError as error:
1128                        LOG.error("Get time error {}".format(error))
1129                        end_time = int(time.mktime(time.strptime(
1130                            timestamp.split(".")[0], ReportConstant.time_format)) * 1000)
1131                    start_time = int(end_time - float(cost_time) * 1000)
1132                else:
1133                    current_time = int(time.time() * 1000)
1134                    start_time, end_time = current_time, current_time
1135        except ArithmeticError as error:
1136            LOG.error("Get time error {}".format(error))
1137            current_time = int(time.time() * 1000)
1138            start_time, end_time = current_time, current_time
1139        return start_time, end_time
1140
1141    @classmethod
1142    def _get_report_path(cls, base_path, report=""):
1143        """ get report path
1144        base_path: str, report base path
1145        report   : str, report relative path
1146        """
1147        report_path = os.path.join(base_path, report)
1148        return report_path if report and os.path.exists(report_path) else base_path
1149
1150    @classmethod
1151    def upload_task_result(cls, task, error_message=""):
1152        if not Scheduler.task_name:
1153            LOG.info("No need upload summary report")
1154            return
1155
1156        summary_data_report = os.path.join(task.config.report_path,
1157                                           ReportConstant.summary_data_report)
1158        if not os.path.exists(summary_data_report):
1159            Scheduler.upload_unavailable_result(str(
1160                error_message) or "summary report not exists",
1161                                                task.config.report_path)
1162            return
1163
1164        task_element = ElementTree.parse(summary_data_report).getroot()
1165        start_time, end_time = cls._get_time(task_element)
1166        task_result = cls._get_task_result(task_element)
1167        error_msg = ""
1168        for child in task_element:
1169            if child.get(ReportConstant.message, ""):
1170                error_msg = "{}{}".format(
1171                    error_msg, "%s;" % child.get(ReportConstant.message))
1172        if error_msg:
1173            error_msg = error_msg[:-1]
1174        report = cls._get_report_path(
1175            task.config.report_path, ReportConstant.summary_vision_report)
1176        cls.upload_case_result(
1177            (Scheduler.task_name, task_result, error_msg, start_time, end_time, report))
1178
1179    @classmethod
1180    def _get_task_result(cls, task_element):
1181        failures = int(task_element.get(ReportConstant.failures, 0))
1182        errors = int(task_element.get(ReportConstant.errors, 0))
1183        disabled = int(task_element.get(ReportConstant.disabled, 0))
1184        unavailable = int(task_element.get(ReportConstant.unavailable, 0))
1185        if disabled > 0:
1186            task_result = "Blocked"
1187        elif errors > 0 or failures > 0:
1188            task_result = "Failed"
1189        elif unavailable > 0:
1190            task_result = "Unavailable"
1191        else:
1192            task_result = "Passed"
1193        return task_result
1194
1195    @classmethod
1196    def upload_unavailable_result(cls, error_msg, report_path=""):
1197        start_time = int(time.time() * 1000)
1198        Scheduler.upload_case_result((Scheduler.task_name, "Unavailable",
1199                                      error_msg, start_time, start_time,
1200                                      report_path))
1201
1202    @classmethod
1203    def upload_report_end(cls):
1204        if getattr(cls, "tmp_json", None):
1205            os.remove(cls.tmp_json)
1206            del cls.tmp_json
1207        LOG.info("Upload report end")
1208        if Scheduler.proxy is not None:
1209            Scheduler.proxy.report_end()
1210        else:
1211            LOG.debug("There is no proxy, can't upload report end")
1212
1213    @classmethod
1214    def is_module_need_retry(cls, task, module_name):
1215        failed_flag = False
1216        if check_mode(ModeType.decc):
1217            from xdevice import SuiteReporter
1218            for module, _ in SuiteReporter.get_failed_case_list():
1219                if module_name == module or str(module_name).split(
1220                        ".")[0] == module:
1221                    failed_flag = True
1222                    break
1223        else:
1224            from xdevice import ResultReporter
1225            history_report_path = \
1226                getattr(task.config, ConfigConst.history_report_path, "")
1227            params = ResultReporter.get_task_info_params(history_report_path)
1228            if params and params[ReportConst.unsuccessful_params]:
1229                if dict(params[ReportConst.unsuccessful_params]).get(
1230                        module_name, []):
1231                    failed_flag = True
1232                elif dict(params[ReportConst.unsuccessful_params]).get(
1233                        str(module_name).split(".")[0], []):
1234                    failed_flag = True
1235        return failed_flag
1236
1237    @classmethod
1238    def compare_spt_time(cls, kit_spt, device_spt):
1239        if not kit_spt or not device_spt:
1240            return False
1241        try:
1242            kit_time = str(kit_spt).split("-")[:2]
1243            device_time = str(device_spt).split("-")[:2]
1244            k_spt = datetime.datetime.strptime(
1245                "-".join(kit_time), "%Y-%m")
1246            d_spt = datetime.datetime.strptime("-".join(device_time), "%Y-%m")
1247        except ValueError as value_error:
1248            LOG.debug("Date format is error, %s" % value_error.args)
1249            return False
1250        month_interval = int(k_spt.month) - int(d_spt.month)
1251        year_interval = int(k_spt.year) - int(d_spt.year)
1252        LOG.debug("Kit spt (year=%s, month=%s), device spt (year=%s, month=%s)"
1253                  % (k_spt.year, k_spt.month, d_spt.year, d_spt.month))
1254        if year_interval < 0:
1255            return True
1256        if year_interval == 0 and month_interval in range(-11, 3):
1257            return True
1258        if year_interval == 1 and month_interval + 12 in (1, 2):
1259            return True
1260        return False
1261
1262    @classmethod
1263    def _parse_property_value(cls, property_name, driver_request, kit):
1264        test_args = copy.deepcopy(
1265            driver_request.config.get(ConfigConst.testargs, dict()))
1266        property_value = ""
1267        if ConfigConst.pass_through in test_args.keys():
1268            pt_dict = json.loads(test_args.get(ConfigConst.pass_through, ""))
1269            property_value = pt_dict.get(property_name, None)
1270        elif property_name in test_args.keys:
1271            property_value = test_args.get(property_name, None)
1272        return property_value if property_value else \
1273            kit.properties.get(property_name, None)
1274
1275    @classmethod
1276    def _calculate_device_options(cls, device_options, environment_config,
1277                                  options, test_source):
1278        # calculate difference
1279        diff_value = len(environment_config) - len(device_options)
1280        if device_options and diff_value == 0:
1281            return device_options
1282
1283        else:
1284            diff_value = diff_value if diff_value else 1
1285            if str(test_source.source_file).endswith(".bin"):
1286                device_option = DeviceSelectionOption(
1287                    options, DeviceLabelType.ipcamera, test_source)
1288            else:
1289                device_option = DeviceSelectionOption(
1290                    options, None, test_source)
1291
1292            device_option.source_file = \
1293                test_source.source_file or test_source.source_string
1294            device_option.required_manager = "device"
1295            device_options.extend([device_option] * diff_value)
1296            LOG.debug("Assign device options and it's length is %s"
1297                      % len(device_options))
1298        return device_options
1299
1300    @classmethod
1301    def update_test_type_in_source(cls, key, value):
1302        LOG.debug("update test type dict in source")
1303        TestDictSource.test_type[key] = value
1304
1305    @classmethod
1306    def update_ext_type_in_source(cls, key, value):
1307        LOG.debug("update ext type dict in source")
1308        TestDictSource.exe_type[key] = value
1309
1310    @classmethod
1311    def clear_test_dict_source(cls):
1312        TestDictSource.clear()
1313
1314    @classmethod
1315    def reset_test_dict_source(cls):
1316        TestDictSource.reset()
1317
1318    @classmethod
1319    def _pre_component_test(cls, config):
1320        if not config.kits:
1321            return
1322        cur_kit = None
1323        for kit in config.kits:
1324            if kit.__class__.__name__ == CKit.component:
1325                cur_kit = kit
1326                break
1327        if not cur_kit:
1328            return
1329        get_white_list = getattr(cur_kit, "get_white_list", None)
1330        if not callable(get_white_list):
1331            return
1332        subsystems, parts = get_white_list()
1333        if not subsystems and not parts:
1334            return
1335        setattr(config, ConfigConst.component_base_kit, cur_kit)
1336
1337    @classmethod
1338    def component_task_setup(cls, task, module_name):
1339        component_kit = task.config.get(ConfigConst.component_base_kit, None)
1340        if not component_kit:
1341            # only -p -s .you do not care about the components that can be
1342            # supported. you only want to run the use cases of the current
1343            # component
1344            return
1345        LOG.debug("Start component task setup")
1346        _component_mapper = task.config.get(ConfigConst.component_mapper)
1347        _subsystem, _part = _component_mapper.get(module_name)
1348
1349        is_hit = False
1350        # find in cache. if not find, update cache
1351        cache_subsystem, cache_part = component_kit.get_cache()
1352        if _subsystem in cache_subsystem or _part in cache_subsystem:
1353            is_hit = True
1354        if not is_hit:
1355            env_manager = EnvironmentManager()
1356            for _, manager in env_manager.managers.items():
1357                if getattr(manager, "devices_list", []):
1358                    for device in manager.devices_list:
1359                        component_kit.__setup__(device)
1360            cache_subsystem, cache_part = component_kit.get_cache()
1361            if _subsystem in cache_subsystem or _part in cache_subsystem:
1362                is_hit = True
1363        if not is_hit:
1364            LOG.warning("%s are skipped, no suitable component found. "
1365                        "Require subsystem=%s part=%s, no device match this"
1366                        % (module_name, _subsystem, _part))
1367
1368    @classmethod
1369    def construct_repeat_list(cls, task, index):
1370        repeat_list = list()
1371        for driver_index in range(len(task.test_drivers)):
1372            cur_test_driver = copy.deepcopy(task.test_drivers[driver_index])
1373            desc = cur_test_driver[1]
1374            desc.unique_id = '{}_{}'.format(desc.unique_id, index + 1)
1375            repeat_list.append(cur_test_driver)
1376        return repeat_list
1377
1378    @classmethod
1379    def start_auto_retry(cls):
1380        if not Scheduler.is_need_auto_retry:
1381            Scheduler.auto_retry = -1
1382            LOG.debug("No need auto retry")
1383            return
1384        if Scheduler.auto_retry > 0:
1385            Scheduler.auto_retry -= 1
1386            if Scheduler.auto_retry == 0:
1387                Scheduler.auto_retry = -1
1388            from _core.command.console import Console
1389            console = Console()
1390            console.command_parser("run --retry")
1391
1392    @classmethod
1393    def check_auto_retry(cls, options):
1394        if Scheduler.auto_retry < 0 and \
1395                int(getattr(options, ConfigConst.auto_retry, 0)) > 0:
1396            value = int(getattr(options, ConfigConst.auto_retry, 0))
1397            Scheduler.auto_retry = value if value <= 10 else 10
1398
1399    @staticmethod
1400    def call_life_stage_action(**kwargs):
1401        """
1402        call in different lift stage
1403        """
1404        from xdevice import Task
1405        from xdevice import Variables
1406        if Task.life_stage_listener is None:
1407            return
1408        stage = kwargs.get("stage", None)
1409        data = dict()
1410        if stage == LifeStage.task_start:
1411            data = {"type": stage, "name": Variables.task_name}
1412        elif stage == LifeStage.task_end:
1413            task = kwargs.get("task", None)
1414            error = kwargs.get("error", "")
1415            unavailable = kwargs.get("unavailable", 0)
1416            summary_data_report = os.path.join(task.config.report_path,
1417                                               ReportConstant.summary_data_report) if task else ""
1418            if not os.path.exists(summary_data_report):
1419                LOG.error("Call lifecycle error, summary report {} not exists".format(task.config.report_path))
1420                passed = failures = blocked = 0
1421            else:
1422                task_element = ElementTree.parse(summary_data_report).getroot()
1423                total_tests = int(task_element.get(ReportConstant.tests, 0))
1424                failures = int(task_element.get(ReportConstant.failures, 0))
1425                blocked = int(task_element.get(ReportConstant.disabled, 0))
1426                ignored = int(task_element.get(ReportConstant.ignored, 0))
1427                unavailable = int(task_element.get(ReportConstant.unavailable, 0))
1428                passed = total_tests - failures - blocked - ignored
1429            data = {"type": stage, "name": Variables.task_name,
1430                    "passed": passed, "failures": failures,
1431                    "blocked": blocked, "unavailable": unavailable,
1432                    "error": error}
1433        elif stage == LifeStage.case_start:
1434            case_name = kwargs.get("case_name", "")
1435            data = {"type": stage, "name": case_name}
1436        elif stage == LifeStage.case_end:
1437            case_name = kwargs.get("case_name", "")
1438            case_result = kwargs.get("case_result", "")
1439            error_msg = kwargs.get("error_msg", "")
1440            data = {"type": stage, "name": case_name, "case_result": case_result, "error_msg": error_msg}
1441        else:
1442            LOG.error("Call lifecycle error, error param stage: {}".format(stage))
1443            return
1444        Task.life_stage_listener(data)
1445