• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import copy
20import datetime
21import os
22import queue
23import time
24import uuid
25import shutil
26import json
27from xml.etree import ElementTree
28
29from _core.utils import unique_id
30from _core.utils import check_mode
31from _core.utils import get_sub_path
32from _core.utils import get_filename_extension
33from _core.utils import convert_serial
34from _core.utils import convert_mac
35from _core.utils import get_instance_name
36from _core.utils import is_config_str
37from _core.utils import check_result_report
38from _core.utils import get_cst_time
39from _core.environment.manager_env import EnvironmentManager
40from _core.environment.manager_env import DeviceSelectionOption
41from _core.exception import ParamError
42from _core.exception import ExecuteTerminate
43from _core.exception import LiteDeviceError
44from _core.exception import DeviceError
45from _core.interface import LifeCycle
46from _core.executor.request import Request
47from _core.executor.request import Descriptor
48from _core.plugin import get_plugin
49from _core.plugin import Plugin
50from _core.plugin import Config
51from _core.report.reporter_helper import ExecInfo
52from _core.report.reporter_helper import ReportConstant
53from _core.report.reporter_helper import Case
54from _core.report.reporter_helper import DataHelper
55from _core.constants import TestExecType
56from _core.constants import CKit
57from _core.constants import ModeType
58from _core.constants import DeviceLabelType
59from _core.constants import SchedulerType
60from _core.constants import ListenerType
61from _core.constants import ConfigConst
62from _core.constants import ReportConst
63from _core.constants import HostDrivenTestType
64from _core.constants import LifeStage
65from _core.executor.concurrent import DriversThread
66from _core.executor.concurrent import QueueMonitorThread
67from _core.executor.concurrent import DriversDryRunThread
68from _core.executor.source import TestSetSource
69from _core.executor.source import find_test_descriptors
70from _core.executor.source import find_testdict_descriptors
71from _core.executor.source import TestDictSource
72from _core.logger import platform_logger
73from _core.logger import add_task_file_handler
74from _core.logger import remove_task_file_handler
75from _core.logger import add_encrypt_file_handler
76from _core.logger import remove_encrypt_file_handler
77
78__all__ = ["Scheduler"]
79LOG = platform_logger("Scheduler")
80
81MAX_VISIBLE_LENGTH = 150
82
83
84@Plugin(type=Plugin.SCHEDULER, id=SchedulerType.scheduler)
85class Scheduler(object):
86    """
87    The Scheduler is the main entry point for client code that wishes to
88    discover and execute tests.
89    """
90    # factory params
91    is_execute = True
92    terminate_result = queue.Queue()
93    upload_address = ""
94    task_type = ""
95    task_name = ""
96    mode = ""
97    proxy = None
98
99    # command_queue to store test commands
100    command_queue = []
101    max_command_num = 50
102    # the number of tests in current task
103    test_number = 0
104    device_labels = []
105    repeat_index = 0
106    auto_retry = -1
107    is_need_auto_retry = False
108
109    queue_monitor_thread = None
110
111    def __discover__(self, args):
112        """Discover task to execute"""
113        from _core.executor.request import Task
114        config = Config()
115        config.update(args)
116        task = Task(drivers=[])
117        task.init(config)
118
119        Scheduler.call_life_stage_action(stage=LifeStage.task_start)
120
121        root_descriptor = self._find_test_root_descriptor(task.config)
122        task.set_root_descriptor(root_descriptor)
123        return task
124
125    def __execute__(self, task):
126        error_message = ""
127        unavailable = 0
128        try:
129            Scheduler.is_execute = True
130            if Scheduler.command_queue:
131                LOG.debug("Run command: {}".format(convert_mac(Scheduler.command_queue[-1])))
132                run_command = Scheduler.command_queue.pop()
133                task_id = str(uuid.uuid1()).split("-")[0]
134                Scheduler.command_queue.append((task_id, run_command,
135                                                task.config.report_path))
136                if len(Scheduler.command_queue) > self.max_command_num:
137                    Scheduler.command_queue.pop(0)
138
139            if getattr(task.config, ConfigConst.test_environment, ""):
140                self._reset_environment(task.config.get(
141                    ConfigConst.test_environment, ""))
142            elif getattr(task.config, ConfigConst.configfile, ""):
143                self._reset_environment(config_file=task.config.get(
144                    ConfigConst.configfile, ""))
145
146            # do with the count of repeat about a task
147            if getattr(task.config, ConfigConst.repeat, 0) > 0:
148                Scheduler.repeat_index = \
149                    getattr(task.config, ConfigConst.repeat)
150                drivers_list = list()
151                for index in range(-1, task.config.repeat):
152                    repeat_list = self.construct_repeat_list(task, index)
153                    if repeat_list:
154                        drivers_list.extend(repeat_list)
155                task.test_drivers = drivers_list
156            else:
157                Scheduler.repeat_index = 0
158
159            self.test_number = len(task.test_drivers)
160            for des in task.root.children:
161                if des.error:
162                    error_message = "{};{}".format(des.error.error_msg, error_message)
163                    unavailable += 1
164            if error_message != "":
165                error_message = "test source '{}' or its config json not exists".format(error_message)
166                LOG.error("Exec task error: {}".format(error_message))
167
168                raise ParamError(error_message)
169
170            if task.config.exectype == TestExecType.device_test:
171                self._device_test_execute(task)
172            elif task.config.exectype == TestExecType.host_test:
173                self._host_test_execute(task)
174            else:
175                LOG.info("Exec type %s is bypassed" % task.config.exectype)
176
177        except (ParamError, ValueError, TypeError, SyntaxError, AttributeError,
178                DeviceError, LiteDeviceError, ExecuteTerminate) as exception:
179            error_no = getattr(exception, "error_no", "")
180            error_message = "%s[%s]" % (str(exception), error_no) \
181                if error_no else str(exception)
182            error_no = error_no if error_no else "00000"
183            LOG.exception(exception, exc_info=False, error_no=error_no)
184        finally:
185            Scheduler.reset_test_dict_source()
186            if getattr(task.config, ConfigConst.test_environment, "") or \
187                    getattr(task.config, ConfigConst.configfile, ""):
188                self._restore_environment()
189
190            Scheduler.call_life_stage_action(stage=LifeStage.task_end,
191                                             task=task, error=error_message,
192                                             unavailable=unavailable)
193            if Scheduler.upload_address:
194                Scheduler.upload_task_result(task, error_message)
195                Scheduler.upload_report_end()
196
197    def _device_test_execute(self, task):
198        used_devices = {}
199        try:
200            self._dynamic_concurrent_execute(task, used_devices)
201        finally:
202            Scheduler.__reset_environment__(used_devices)
203            # generate reports
204            self._generate_task_report(task, used_devices)
205
206    def _host_test_execute(self, task):
207        """Execute host test"""
208        try:
209            # initial params
210            current_driver_threads = {}
211            test_drivers = task.test_drivers
212            message_queue = queue.Queue()
213
214            # execute test drivers
215            self.queue_monitor_thread = self._start_queue_monitor(
216                message_queue, test_drivers, current_driver_threads)
217            while test_drivers:
218                if len(current_driver_threads) > 5:
219                    time.sleep(3)
220                    continue
221
222                # clear remaining test drivers when scheduler is terminated
223                if not Scheduler.is_execute:
224                    LOG.info("Clear test drivers")
225                    self._clear_not_executed(task, test_drivers)
226                    break
227
228                # get test driver and device
229                test_driver = test_drivers[0]
230
231                # display executing progress
232                self._display_executing_process(None, test_driver,
233                                                test_drivers)
234
235                # start driver thread
236                self._start_driver_thread(current_driver_threads, (
237                    None, message_queue, task, test_driver))
238                test_drivers.pop(0)
239
240            # wait for all drivers threads finished and do kit teardown
241            while True:
242                if not self.queue_monitor_thread.is_alive():
243                    break
244                time.sleep(3)
245
246        finally:
247            # generate reports
248            self._generate_task_report(task)
249
250    def _dry_run_device_test_execute(self, task):
251        try:
252            # initial params
253            used_devices = {}
254            current_driver_threads = {}
255            test_drivers = task.test_drivers
256            message_queue = queue.Queue()
257            task_unused_env = []
258
259            # execute test drivers
260            self.queue_monitor_thread = self._start_queue_monitor(
261                message_queue, test_drivers, current_driver_threads)
262            while test_drivers:
263                # clear remaining test drivers when scheduler is terminated
264                if not Scheduler.is_execute:
265                    LOG.info("Clear test drivers")
266                    self._clear_not_executed(task, test_drivers)
267                    break
268
269                # get test driver and device
270                test_driver = test_drivers[0]
271                # get environment
272                try:
273                    environment = self.__allocate_environment__(
274                        task.config.__dict__, test_driver)
275                except DeviceError as exception:
276                    self._handle_device_error(exception, task, test_drivers)
277                    continue
278
279                if not Scheduler.is_execute:
280                    if environment:
281                        Scheduler.__free_environment__(environment)
282                    continue
283
284                # start driver thread
285                thread_id = self._get_thread_id(current_driver_threads)
286                driver_thread = DriversDryRunThread(test_driver, task,
287                                                    environment,
288                                                    message_queue)
289                driver_thread.setDaemon(True)
290                driver_thread.set_thread_id(thread_id)
291                driver_thread.start()
292                current_driver_threads.setdefault(thread_id, driver_thread)
293
294                test_drivers.pop(0)
295
296            # wait for all drivers threads finished and do kit teardown
297            while True:
298                if not self.queue_monitor_thread.is_alive():
299                    break
300                time.sleep(3)
301
302            self._do_taskkit_teardown(used_devices, task_unused_env)
303        finally:
304            LOG.debug(
305                "Removing report_path: {}".format(task.config.report_path))
306            # delete reports
307            self.stop_task_logcat()
308            self.stop_encrypt_log()
309            shutil.rmtree(task.config.report_path)
310
311    def _generate_task_report(self, task, used_devices=None):
312        task_info = ExecInfo()
313        test_type = getattr(task.config, "testtype", [])
314        task_name = getattr(task.config, "task", "")
315        if task_name:
316            task_info.test_type = str(task_name).upper()
317        else:
318            task_info.test_type = ",".join(test_type) if test_type else "Test"
319        if used_devices:
320            serials = []
321            platforms = []
322            test_labels = []
323            for serial, device in used_devices.items():
324                serials.append(convert_serial(serial))
325                platform = str(device.label).capitalize()
326                test_label = str(device.label).capitalize()
327                if platform not in platforms:
328                    platforms.append(platform)
329                if test_label not in test_labels:
330                    test_labels.append(test_label)
331            task_info.device_name = ",".join(serials)
332            task_info.platform = ",".join(platforms)
333            task_info.device_label = ",".join(test_labels)
334        else:
335            task_info.device_name = "None"
336            task_info.platform = "None"
337            task_info.device_label = "None"
338        task_info.test_time = task.config.start_time
339        task_info.product_info = getattr(task, "product_info", "")
340
341        listeners = self._create_listeners(task)
342        for listener in listeners:
343            listener.__ended__(LifeCycle.TestTask, task_info,
344                               test_type=task_info.test_type, task=task)
345
346    @classmethod
347    def _create_listeners(cls, task):
348        listeners = []
349        # append log listeners
350        log_listeners = get_plugin(Plugin.LISTENER, ListenerType.log)
351        for log_listener in log_listeners:
352            log_listener_instance = log_listener.__class__()
353            listeners.append(log_listener_instance)
354        # append report listeners
355        report_listeners = get_plugin(Plugin.LISTENER, ListenerType.report)
356        for report_listener in report_listeners:
357            report_listener_instance = report_listener.__class__()
358            setattr(report_listener_instance, "report_path",
359                    task.config.report_path)
360            listeners.append(report_listener_instance)
361        # append upload listeners
362        upload_listeners = get_plugin(Plugin.LISTENER, ListenerType.upload)
363        for upload_listener in upload_listeners:
364            upload_listener_instance = upload_listener.__class__()
365            listeners.append(upload_listener_instance)
366        return listeners
367
368    @staticmethod
369    def _find_device_options(environment_config, options, test_source):
370        devices_option = []
371        index = 1
372        for device_dict in environment_config:
373            label = device_dict.get("label", "")
374            required_manager = device_dict.get("type", "device")
375            required_manager = \
376                required_manager if required_manager else "device"
377            if not label:
378                continue
379            device_option = DeviceSelectionOption(options, label, test_source)
380            device_dict.pop("type", None)
381            device_dict.pop("label", None)
382            device_option.required_manager = required_manager
383            device_option.extend_value = device_dict
384            device_option.source_file = \
385                test_source.config_file or test_source.source_string
386            if hasattr(device_option, "env_index"):
387                device_option.env_index = index
388            index += 1
389            devices_option.append(device_option)
390        return devices_option
391
392    def __allocate_environment__(self, options, test_driver):
393        device_options = self.get_device_options(options,
394                                                 test_driver[1].source)
395        environment = None
396        env_manager = EnvironmentManager()
397        while True:
398            if not Scheduler.is_execute:
399                break
400            if self.queue_monitor_thread and \
401                    not self.queue_monitor_thread.is_alive():
402                LOG.error("Queue monitor thread is dead.")
403                break
404            environment = env_manager.apply_environment(device_options)
405            if len(environment.devices) == len(device_options):
406                return environment
407            else:
408                env_manager.release_environment(environment)
409                LOG.debug("'%s' is waiting available device",
410                          test_driver[1].source.test_name)
411                if env_manager.check_device_exist(device_options):
412                    continue
413                else:
414                    LOG.debug("'%s' required %s devices, actually %s devices"
415                              " were found" % (test_driver[1].source.test_name,
416                                               len(device_options),
417                                               len(environment.devices)))
418                    raise DeviceError("The '%s' required device does not exist"
419                                      % test_driver[1].source.source_file,
420                                      error_no="00104")
421
422        return environment
423
424    @classmethod
425    def get_device_options(cls, options, test_source):
426        device_options = []
427        config_file = test_source.config_file
428        environment_config = []
429        from _core.testkit.json_parser import JsonParser
430        if test_source.source_string and is_config_str(
431                test_source.source_string):
432            json_config = JsonParser(test_source.source_string)
433            environment_config = json_config.get_environment()
434            device_options = cls._find_device_options(
435                environment_config, options, test_source)
436        elif config_file and os.path.exists(config_file):
437            json_config = JsonParser(test_source.config_file)
438            environment_config = json_config.get_environment()
439            device_options = cls._find_device_options(
440                environment_config, options, test_source)
441
442        device_options = cls._calculate_device_options(
443            device_options, environment_config, options, test_source)
444
445        if ConfigConst.component_mapper in options.keys():
446            required_component = options.get(ConfigConst.component_mapper). \
447                get(test_source.module_name, None)
448            for device_option in device_options:
449                device_option.required_component = required_component
450        return device_options
451
452    @staticmethod
453    def __free_environment__(environment):
454        env_manager = EnvironmentManager()
455        env_manager.release_environment(environment)
456
457    @staticmethod
458    def __reset_environment__(used_devices):
459        env_manager = EnvironmentManager()
460        env_manager.reset_environment(used_devices)
461
462    @classmethod
463    def _check_device_spt(cls, kit, driver_request, device):
464        kit_spt = cls._parse_property_value(ConfigConst.spt,
465                                            driver_request, kit)
466        if not kit_spt:
467            setattr(device, ConfigConst.task_state, False)
468            LOG.error("Spt is empty", error_no="00108")
469            return
470        if getattr(driver_request, ConfigConst.product_info, ""):
471            product_info = getattr(driver_request,
472                                   ConfigConst.product_info)
473            if not isinstance(product_info, dict):
474                LOG.warning("Product info should be dict, %s",
475                            product_info)
476                setattr(device, ConfigConst.task_state, False)
477                return
478            device_spt = product_info.get("Security Patch", None)
479            if not device_spt or not \
480                    Scheduler.compare_spt_time(kit_spt, device_spt):
481                LOG.error("The device %s spt is %s, "
482                          "and the test case spt is %s, "
483                          "which does not meet the requirements" %
484                          (device.device_sn, device_spt, kit_spt),
485                          error_no="00116")
486                setattr(device, ConfigConst.task_state, False)
487                return
488
489    def _decc_task_setup(self, environment, task):
490        config = Config()
491        config.update(task.config.__dict__)
492        config.environment = environment
493        driver_request = Request(config=config)
494
495        if environment is None:
496            return False
497
498        for device in environment.devices:
499            if not getattr(device, ConfigConst.need_kit_setup, True):
500                LOG.debug("Device %s need kit setup is false" % device)
501                continue
502
503            # do task setup for device
504            kits_copy = copy.deepcopy(task.config.kits)
505            setattr(device, ConfigConst.task_kits, kits_copy)
506            for kit in getattr(device, ConfigConst.task_kits, []):
507                if not Scheduler.is_execute:
508                    break
509                try:
510                    kit.__setup__(device, request=driver_request)
511                except (ParamError, ExecuteTerminate, DeviceError,
512                        LiteDeviceError, ValueError, TypeError,
513                        SyntaxError, AttributeError) as exception:
514                    error_no = getattr(exception, "error_no", "00000")
515                    LOG.exception(
516                        "Task setup device: %s, exception: %s" % (
517                            environment.__get_serial__(),
518                            exception), exc_info=False, error_no=error_no)
519                if kit.__class__.__name__ == CKit.query and \
520                        device.label in [DeviceLabelType.ipcamera]:
521                    self._check_device_spt(kit, driver_request, device)
522            LOG.debug("Set device %s need kit setup to false" % device)
523            setattr(device, ConfigConst.need_kit_setup, False)
524
525        for device in environment.devices:
526            if not getattr(device, ConfigConst.task_state, True):
527                return False
528
529        # set product_info to self.task
530        if getattr(driver_request, ConfigConst.product_info, "") and \
531                not getattr(task, ConfigConst.product_info, ""):
532            product_info = getattr(driver_request, ConfigConst.product_info)
533            if not isinstance(product_info, dict):
534                LOG.warning("Product info should be dict, %s",
535                            product_info)
536            else:
537                setattr(task, ConfigConst.product_info, product_info)
538        return True
539
540    def _dynamic_concurrent_execute(self, task, used_devices):
541        # initial params
542        current_driver_threads = {}
543        test_drivers = task.test_drivers
544        message_queue = queue.Queue()
545        task_unused_env = []
546
547        # execute test drivers
548        self.queue_monitor_thread = self._start_queue_monitor(
549            message_queue, test_drivers, current_driver_threads)
550        while test_drivers:
551            # clear remaining test drivers when scheduler is terminated
552            if not Scheduler.is_execute:
553                LOG.info("Clear test drivers")
554                self._clear_not_executed(task, test_drivers)
555                break
556
557            # get test driver and device
558            test_driver = test_drivers[0]
559
560            # call life stage
561            Scheduler.call_life_stage_action(stage=LifeStage.case_start,
562                                             case_name=test_driver[1].source.module_name)
563
564            if getattr(task.config, ConfigConst.history_report_path, ""):
565                module_name = test_driver[1].source.module_name
566                if not self.is_module_need_retry(task, module_name):
567                    self._display_executing_process(None, test_driver,
568                                                    test_drivers)
569                    LOG.info("%s are passed, no need to retry" % module_name)
570                    self._append_history_result(task, module_name)
571                    LOG.info("")
572                    test_drivers.pop(0)
573                    continue
574
575            if getattr(task.config, ConfigConst.component_mapper, ""):
576                module_name = test_driver[1].source.module_name
577                self.component_task_setup(task, module_name)
578
579            # get environment
580            try:
581                environment = self.__allocate_environment__(
582                    task.config.__dict__, test_driver)
583            except DeviceError as exception:
584                self._handle_device_error(exception, task, test_drivers)
585                Scheduler.call_life_stage_action(stage=LifeStage.case_end,
586                                                 case_name=test_driver[1].source.module_name,
587                                                 case_result="Failed",
588                                                 error_msg=exception.args)
589                continue
590
591            if not self.queue_monitor_thread.is_alive():
592                LOG.debug("Restart queue monitor thread.")
593                current_driver_threads = {}
594                message_queue = queue.Queue()
595                self.queue_monitor_thread = self._start_queue_monitor(
596                    message_queue, test_drivers, current_driver_threads)
597                continue
598
599            if not Scheduler.is_execute:
600                if environment:
601                    Scheduler.__free_environment__(environment)
602                continue
603
604            if check_mode(ModeType.decc) or getattr(
605                    task.config, ConfigConst.check_device, False):
606                LOG.info("Start to check environment: %s" %
607                         environment.__get_serial__())
608                status = self._decc_task_setup(environment, task)
609                if not status:
610                    Scheduler.__free_environment__(environment)
611                    task_unused_env.append(environment)
612                    error_message = "Load Error[00116]"
613                    self.report_not_executed(task.config.report_path,
614                                             [test_drivers[0]],
615                                             error_message, task)
616                    test_drivers.pop(0)
617                    continue
618                else:
619                    LOG.info("Environment %s check success",
620                             environment.__get_serial__())
621
622            # display executing progress
623            self._display_executing_process(environment, test_driver,
624                                            test_drivers)
625
626            # add to used devices and set need_kit_setup attribute
627            self._append_used_devices(environment, used_devices)
628
629            # start driver thread
630            self._start_driver_thread(current_driver_threads, (
631                environment, message_queue, task, test_driver))
632            test_drivers.pop(0)
633
634        # wait for all drivers threads finished and do kit teardown
635        while True:
636            if not self.queue_monitor_thread.is_alive():
637                break
638            time.sleep(3)
639
640        self._do_taskkit_teardown(used_devices, task_unused_env)
641
642    @classmethod
643    def _append_history_result(cls, task, module_name):
644        history_report_path = getattr(
645            task.config, ConfigConst.history_report_path, "")
646        from _core.report.result_reporter import ResultReporter
647        params = ResultReporter.get_task_info_params(
648            history_report_path)
649
650        if not params or not params[ReportConst.data_reports]:
651            LOG.debug("Task info record data reports is empty")
652            return
653
654        report_data_dict = dict(params[ReportConst.data_reports])
655        if module_name not in report_data_dict.keys():
656            module_name_ = str(module_name).split(".")[0]
657            if module_name_ not in report_data_dict.keys():
658                LOG.error("%s not in data reports" % module_name)
659                return
660            module_name = module_name_
661
662        from xdevice import SuiteReporter
663        if check_mode(ModeType.decc):
664            virtual_report_path, report_result = SuiteReporter. \
665                get_history_result_by_module(module_name)
666            LOG.debug("Append history result: (%s, %s)" % (
667                virtual_report_path, report_result))
668            SuiteReporter.append_report_result(
669                (virtual_report_path, report_result))
670        else:
671            history_execute_result = report_data_dict.get(module_name, "")
672            LOG.info("Start copy %s" % history_execute_result)
673            file_name = get_filename_extension(history_execute_result)[0]
674            if os.path.exists(history_execute_result):
675                result_dir = \
676                    os.path.join(task.config.report_path, "result")
677                os.makedirs(result_dir, exist_ok=True)
678                target_execute_result = "%s.xml" % os.path.join(
679                    task.config.report_path, "result", file_name)
680                shutil.copyfile(history_execute_result, target_execute_result)
681                LOG.info("Copy %s to %s" % (
682                    history_execute_result, target_execute_result))
683            else:
684                error_msg = "Copy failed! %s not exists!" % \
685                            history_execute_result
686                raise ParamError(error_msg)
687
688    def _handle_device_error(self, exception, task, test_drivers):
689        self._display_executing_process(None, test_drivers[0], test_drivers)
690        error_message = "%s: %s" % \
691                        (get_instance_name(exception), exception)
692        LOG.exception(error_message, exc_info=False,
693                      error_no=exception.error_no)
694        if check_mode(ModeType.decc):
695            error_message = "Load Error[00104]"
696        self.report_not_executed(task.config.report_path, [test_drivers[0]],
697                                 error_message, task)
698
699        LOG.info("")
700        test_drivers.pop(0)
701
702    @classmethod
703    def _clear_not_executed(cls, task, test_drivers):
704        if Scheduler.mode != ModeType.decc:
705            # clear all
706            test_drivers.clear()
707            return
708        # The result is reported only in DECC mode, and also clear all.
709        LOG.error("Case no run: task execution terminated!", error_no="00300")
710        error_message = "Execute Terminate[00300]"
711        cls.report_not_executed(task.config.report_path, test_drivers,
712                                error_message)
713        test_drivers.clear()
714
715    @classmethod
716    def report_not_executed(cls, report_path, test_drivers, error_message,
717                            task=None):
718        # traversing list to get remained elements
719        for test_driver in test_drivers:
720            # get report file
721            if task and getattr(task.config, "testdict", ""):
722                report_file = os.path.join(get_sub_path(
723                    test_driver[1].source.source_file),
724                    "%s.xml" % test_driver[1].source.test_name)
725            else:
726                report_file = os.path.join(
727                    report_path, "result",
728                    "%s.xml" % test_driver[1].source.module_name)
729
730            # get report name
731            report_name = test_driver[1].source.test_name if \
732                not test_driver[1].source.test_name.startswith("{") \
733                else "report"
734
735            # get module name
736            module_name = test_driver[1].source.module_name
737
738            # here, normally create empty report and then upload result
739            check_result_report(report_path, report_file, error_message,
740                                report_name, module_name)
741
742    def _start_driver_thread(self, current_driver_threads, thread_params):
743        environment, message_queue, task, test_driver = thread_params
744        thread_id = self._get_thread_id(current_driver_threads)
745        driver_thread = DriversThread(test_driver, task, environment,
746                                      message_queue)
747        driver_thread.setDaemon(True)
748        driver_thread.set_thread_id(thread_id)
749        driver_thread.set_listeners(self._create_listeners(task))
750        driver_thread.start()
751        current_driver_threads.setdefault(thread_id, driver_thread)
752
753    @classmethod
754    def _do_taskkit_teardown(cls, used_devices, task_unused_env):
755        for device in used_devices.values():
756            if getattr(device, ConfigConst.need_kit_setup, True):
757                continue
758
759            for kit in getattr(device, ConfigConst.task_kits, []):
760                try:
761                    kit.__teardown__(device)
762                except Exception as error:
763                    LOG.debug("Do task kit teardown: %s" % error)
764            setattr(device, ConfigConst.task_kits, [])
765            setattr(device, ConfigConst.need_kit_setup, True)
766
767        for environment in task_unused_env:
768            for device in environment.devices:
769                setattr(device, ConfigConst.task_state, True)
770                setattr(device, ConfigConst.need_kit_setup, True)
771
772    def _display_executing_process(self, environment, test_driver,
773                                   test_drivers):
774        source_content = test_driver[1].source.source_file or \
775                         test_driver[1].source.source_string
776        if environment is None:
777            LOG.info("[%d / %d] Executing: %s, Driver: %s" %
778                     (self.test_number - len(test_drivers) + 1,
779                      self.test_number, source_content,
780                      test_driver[1].source.test_type))
781            return
782
783        LOG.info("[%d / %d] Executing: %s, Device: %s, Driver: %s" %
784                 (self.test_number - len(test_drivers) + 1,
785                  self.test_number, source_content,
786                  environment.__get_serial__(),
787                  test_driver[1].source.test_type))
788
789    @classmethod
790    def _get_thread_id(cls, current_driver_threads):
791        thread_id = get_cst_time().strftime('%Y-%m-%d-%H-%M-%S-%f')
792        while thread_id in current_driver_threads.keys():
793            thread_id = get_cst_time().strftime('%Y-%m-%d-%H-%M-%S-%f')
794        return thread_id
795
796    @classmethod
797    def _append_used_devices(cls, environment, used_devices):
798        if environment is not None:
799            for device in environment.devices:
800                device_serial = device.__get_serial__() if device else "None"
801                if device_serial and device_serial not in used_devices.keys():
802                    used_devices[device_serial] = device
803
804    @staticmethod
805    def _start_queue_monitor(message_queue, test_drivers,
806                             current_driver_threads):
807        queue_monitor_thread = QueueMonitorThread(message_queue,
808                                                  current_driver_threads,
809                                                  test_drivers)
810        queue_monitor_thread.setDaemon(True)
811        queue_monitor_thread.start()
812        return queue_monitor_thread
813
814    def exec_command(self, command, options):
815        """
816        Directly executes a command without adding it to the command queue.
817        """
818        if command != "run":
819            raise ParamError("unsupported command action: %s" % command,
820                             error_no="00100")
821        exec_type = options.exectype
822        if exec_type in [TestExecType.device_test, TestExecType.host_test,
823                         TestExecType.host_driven_test]:
824            self._exec_task(options)
825        else:
826            LOG.error("Unsupported execution type '%s'" % exec_type,
827                      error_no="00100")
828
829        return
830
831    def _exec_task(self, options):
832        """
833        Directly allocates a device and execute a device test.
834        """
835        try:
836            self.check_auto_retry(options)
837            task = self.__discover__(options.__dict__)
838            self.__execute__(task)
839        except (ParamError, ValueError, TypeError, SyntaxError,
840                AttributeError) as exception:
841            error_no = getattr(exception, "error_no", "00000")
842            LOG.exception("%s: %s" % (get_instance_name(exception), exception),
843                          exc_info=False, error_no=error_no)
844            if Scheduler.upload_address:
845                Scheduler.upload_unavailable_result(str(exception.args))
846                Scheduler.upload_report_end()
847        finally:
848            self.stop_task_logcat()
849            self.stop_encrypt_log()
850            self.start_auto_retry()
851
852    @classmethod
853    def _reset_environment(cls, environment="", config_file=""):
854        env_manager = EnvironmentManager()
855        env_manager.env_stop()
856        EnvironmentManager(environment, config_file)
857
858    @classmethod
859    def _restore_environment(cls):
860        env_manager = EnvironmentManager()
861        env_manager.env_stop()
862        EnvironmentManager()
863
864    @classmethod
865    def start_task_log(cls, log_path):
866        tool_file_name = "task_log.log"
867        tool_log_file = os.path.join(log_path, tool_file_name)
868        add_task_file_handler(tool_log_file)
869
870    @classmethod
871    def start_encrypt_log(cls, log_path):
872        from _core.report.encrypt import check_pub_key_exist
873        if check_pub_key_exist():
874            encrypt_file_name = "task_log.ept"
875            encrypt_log_file = os.path.join(log_path, encrypt_file_name)
876            add_encrypt_file_handler(encrypt_log_file)
877
878    @classmethod
879    def stop_task_logcat(cls):
880        remove_task_file_handler()
881
882    @classmethod
883    def stop_encrypt_log(cls):
884        remove_encrypt_file_handler()
885
886    @staticmethod
887    def _find_test_root_descriptor(config):
888        if getattr(config, ConfigConst.task, None) or \
889                getattr(config, ConfigConst.testargs, None):
890            Scheduler._pre_component_test(config)
891
892        if getattr(config, ConfigConst.subsystems, "") or \
893                getattr(config, ConfigConst.parts, "") or \
894                getattr(config, ConfigConst.component_base_kit, ""):
895            uid = unique_id("Scheduler", "component")
896            if config.subsystems or config.parts:
897                test_set = (config.subsystems, config.parts)
898            else:
899                kit = getattr(config, ConfigConst.component_base_kit)
900                test_set = kit.get_white_list()
901
902            root = Descriptor(uuid=uid, name="component",
903                              source=TestSetSource(test_set),
904                              con=True)
905
906            root.children = find_test_descriptors(config)
907            return root
908            # read test list from testdict
909        if getattr(config, ConfigConst.testdict, "") != "" and getattr(
910                config, ConfigConst.testfile, "") == "":
911            uid = unique_id("Scheduler", "testdict")
912            root = Descriptor(uuid=uid, name="testdict",
913                              source=TestSetSource(config.testdict),
914                              con=True)
915            root.children = find_testdict_descriptors(config)
916            return root
917
918            # read test list from testfile, testlist or task
919        test_set = getattr(config, ConfigConst.testfile, "") or getattr(
920            config, ConfigConst.testlist, "") or getattr(
921            config, ConfigConst.task, "") or getattr(
922            config, ConfigConst.testcase)
923        if test_set:
924            fname, _ = get_filename_extension(test_set)
925            uid = unique_id("Scheduler", fname)
926            root = Descriptor(uuid=uid, name=fname,
927                              source=TestSetSource(test_set), con=True)
928            root.children = find_test_descriptors(config)
929            return root
930        else:
931            raise ParamError("no test file, list, dict, case or task found",
932                             error_no="00102")
933
934    @classmethod
935    def terminate_cmd_exec(cls):
936        Scheduler.is_execute = False
937        Scheduler.repeat_index = 0
938        Scheduler.auto_retry = -1
939        LOG.info("Start to terminate execution")
940        return Scheduler.terminate_result.get()
941
942    @classmethod
943    def upload_case_result(cls, upload_param):
944        if not Scheduler.upload_address:
945            return
946        case_id, result, error, start_time, end_time, report_path = \
947            upload_param
948        if error and len(error) > MAX_VISIBLE_LENGTH:
949            error = "%s..." % error[:MAX_VISIBLE_LENGTH]
950        LOG.info(
951            "Get upload params: %s, %s, %s, %s, %s, %s" % (
952                case_id, result, error, start_time, end_time, report_path))
953        if Scheduler.proxy is not None:
954            Scheduler.proxy.upload_result(case_id, result, error, start_time,
955                                          end_time, report_path)
956        else:
957            LOG.debug("There is no proxy, can't upload case result")
958
959    @classmethod
960    def upload_module_result(cls, exec_message):
961        if not Scheduler.is_execute:
962            return
963        result_file = exec_message.get_result()
964        request = exec_message.get_request()
965        test_name = request.root.source.test_name
966        if not result_file or not os.path.exists(result_file):
967            LOG.error("%s result not exists", test_name, error_no="00200")
968            return
969
970        test_type = request.root.source.test_type
971        LOG.info("Need upload result: %s, test type: %s" %
972                 (result_file, test_type))
973        upload_params, _, _ = cls._get_upload_params(result_file, request)
974        if not upload_params:
975            LOG.error("%s no test case result to upload" % result_file,
976                      error_no="00201")
977            return
978        LOG.info("Need upload %s case" % len(upload_params))
979        upload_suite = []
980        for upload_param in upload_params:
981            case_id, result, error, start_time, end_time, report_path = \
982                upload_param
983            case = {"caseid": case_id, "result": result, "error": error,
984                    "start": start_time, "end": end_time,
985                    "report": report_path}
986            LOG.info("Case info: %s", case)
987            upload_suite.append(case)
988        if Scheduler.proxy is not None:
989            Scheduler.proxy.upload_batch(upload_suite)
990        else:
991            LOG.debug("There is no proxy, can't upload module result")
992
993    @classmethod
994    def _get_upload_params(cls, result_file, request):
995        upload_params = []
996        report_path = result_file
997        testsuites_element = DataHelper.parse_data_report(report_path)
998        start_time, end_time = cls._get_time(testsuites_element)
999        test_type = request.get_test_type()
1000        if test_type == HostDrivenTestType.device_test or test_type == HostDrivenTestType.windows_test:
1001            for model_element in testsuites_element:
1002                case_id = model_element.get(ReportConstant.name, "")
1003                case_result, error = cls.get_script_result(model_element)
1004                if error and len(error) > MAX_VISIBLE_LENGTH:
1005                    error = "{}...".format(error[:MAX_VISIBLE_LENGTH])
1006                report = cls._get_report_path(
1007                    request.config.report_path,
1008                    model_element.get(ReportConstant.report, ""))
1009                upload_params.append(
1010                    (case_id, case_result, error, start_time, end_time, report,))
1011        else:
1012            for testsuite_element in testsuites_element:
1013                if check_mode(ModeType.developer):
1014                    module_name = str(get_filename_extension(
1015                        report_path)[0]).split(".")[0]
1016                else:
1017                    module_name = testsuite_element.get(ReportConstant.name,
1018                                                        "none")
1019                for case_element in testsuite_element:
1020                    case_id = cls._get_case_id(case_element, module_name)
1021                    case_result, error = cls._get_case_result(case_element)
1022                    if case_result == "Ignored":
1023                        LOG.info(
1024                            "Get upload params: {} result is ignored".format(case_id))
1025                        continue
1026                    if error and len(error) > MAX_VISIBLE_LENGTH:
1027                        error = "{}...".format(error[:MAX_VISIBLE_LENGTH])
1028                    report = cls._get_report_path(
1029                        request.config.report_path,
1030                        case_element.get(ReportConstant.report, ""))
1031                    upload_params.append(
1032                        (case_id, case_result, error, start_time, end_time, report,))
1033        return upload_params, start_time, end_time
1034
1035    @classmethod
1036    def get_script_result(cls, model_element):
1037        disabled = int(model_element.get(ReportConstant.disabled)) if \
1038            model_element.get(ReportConstant.disabled, "") else 0
1039        failures = int(model_element.get(ReportConstant.failures)) if \
1040            model_element.get(ReportConstant.failures, "") else 0
1041        errors = int(model_element.get(ReportConstant.errors)) if \
1042            model_element.get(ReportConstant.errors, "") else 0
1043        unavailable = int(model_element.get(ReportConstant.unavailable)) if \
1044            model_element.get(ReportConstant.unavailable, "") else 0
1045        if failures > 0 or errors > 0:
1046            result = "Failed"
1047        elif disabled > 0 or unavailable > 0:
1048            result = "Unavailable"
1049        else:
1050            result = "Passed"
1051
1052        if result == "Passed":
1053            return result, ""
1054        if Scheduler.mode == ModeType.decc:
1055            result = "Failed"
1056        result_kind = model_element.get(ReportConstant.result_kind, "")
1057        if result_kind:
1058            result = result_kind
1059
1060        error_msg = model_element.get(ReportConstant.message, "")
1061        if not error_msg and len(model_element) > 0:
1062            error_msg = model_element[0].get(ReportConstant.message, "")
1063            if not error_msg and len(model_element[0]) > 0:
1064                error_msg = model_element[0][0].get(ReportConstant.message, "")
1065        return result, error_msg
1066
1067    @classmethod
1068    def _get_case_id(cls, case_element, package_name):
1069        class_name = case_element.get(ReportConstant.class_name, "none")
1070        method_name = case_element.get(ReportConstant.name, "none")
1071        case_id = "{}#{}#{}#{}".format(Scheduler.task_name, package_name,
1072                                       class_name, method_name)
1073        return case_id
1074
1075    @classmethod
1076    def _get_case_result(cls, case_element):
1077        # get result
1078        case = Case()
1079        case.status = case_element.get(ReportConstant.status, "")
1080        case.result = case_element.get(ReportConstant.result, "")
1081        if case_element.get(ReportConstant.message, ""):
1082            case.message = case_element.get(ReportConstant.message)
1083        if len(case_element) > 0:
1084            if not case.result:
1085                case.result = ReportConstant.false
1086            case.message = case_element[0].get(ReportConstant.message)
1087        if case.is_passed():
1088            result = "Passed"
1089        elif case.is_failed():
1090            result = "Failed"
1091        elif case.is_blocked():
1092            result = "Blocked"
1093        elif case.is_ignored():
1094            result = "Ignored"
1095        elif case.is_completed():
1096            if case.message:
1097                result = "Failed"
1098            else:
1099                result = "Passed"
1100        else:
1101            result = "Unavailable"
1102        return result, case.message
1103
1104    @classmethod
1105    def _get_time(cls, testsuite_element):
1106        start_time = testsuite_element.get(ReportConstant.start_time, "")
1107        end_time = testsuite_element.get(ReportConstant.end_time, "")
1108        try:
1109            if start_time and end_time:
1110                start_time = int(time.mktime(time.strptime(
1111                    start_time, ReportConstant.time_format)) * 1000)
1112                end_time = int(time.mktime(time.strptime(
1113                    end_time, ReportConstant.time_format)) * 1000)
1114            else:
1115                timestamp = str(testsuite_element.get(
1116                    ReportConstant.time_stamp, "")).replace("T", " ")
1117                cost_time = testsuite_element.get(ReportConstant.time, "")
1118                if timestamp and cost_time:
1119                    try:
1120                        end_time = int(time.mktime(time.strptime(
1121                            timestamp, ReportConstant.time_format)) * 1000)
1122                    except ArithmeticError as error:
1123                        LOG.error("Get time error {}".format(error))
1124                        end_time = int(time.time() * 1000)
1125                    except ValueError as error:
1126                        LOG.error("Get time error {}".format(error))
1127                        end_time = int(time.mktime(time.strptime(
1128                            timestamp.split(".")[0], ReportConstant.time_format)) * 1000)
1129                    start_time = int(end_time - float(cost_time) * 1000)
1130                else:
1131                    current_time = int(time.time() * 1000)
1132                    start_time, end_time = current_time, current_time
1133        except ArithmeticError as error:
1134            LOG.error("Get time error {}".format(error))
1135            current_time = int(time.time() * 1000)
1136            start_time, end_time = current_time, current_time
1137        return start_time, end_time
1138
1139    @classmethod
1140    def _get_report_path(cls, base_path, report=""):
1141        """ get report path
1142        base_path: str, report base path
1143        report   : str, report relative path
1144        """
1145        report_path = os.path.join(base_path, report)
1146        return report_path if report and os.path.exists(report_path) else base_path
1147
1148    @classmethod
1149    def upload_task_result(cls, task, error_message=""):
1150        if not Scheduler.task_name:
1151            LOG.info("No need upload summary report")
1152            return
1153
1154        summary_data_report = os.path.join(task.config.report_path,
1155                                           ReportConstant.summary_data_report)
1156        if not os.path.exists(summary_data_report):
1157            Scheduler.upload_unavailable_result(str(
1158                error_message) or "summary report not exists",
1159                                                task.config.report_path)
1160            return
1161
1162        task_element = ElementTree.parse(summary_data_report).getroot()
1163        start_time, end_time = cls._get_time(task_element)
1164        task_result = cls._get_task_result(task_element)
1165        error_msg = ""
1166        for child in task_element:
1167            if child.get(ReportConstant.message, ""):
1168                error_msg = "{}{}".format(
1169                    error_msg, "%s;" % child.get(ReportConstant.message))
1170        if error_msg:
1171            error_msg = error_msg[:-1]
1172        report = cls._get_report_path(
1173            task.config.report_path, ReportConstant.summary_vision_report)
1174        cls.upload_case_result(
1175            (Scheduler.task_name, task_result, error_msg, start_time, end_time, report))
1176
1177    @classmethod
1178    def _get_task_result(cls, task_element):
1179        failures = int(task_element.get(ReportConstant.failures, 0))
1180        errors = int(task_element.get(ReportConstant.errors, 0))
1181        disabled = int(task_element.get(ReportConstant.disabled, 0))
1182        unavailable = int(task_element.get(ReportConstant.unavailable, 0))
1183        if disabled > 0:
1184            task_result = "Blocked"
1185        elif errors > 0 or failures > 0:
1186            task_result = "Failed"
1187        elif unavailable > 0:
1188            task_result = "Unavailable"
1189        else:
1190            task_result = "Passed"
1191        return task_result
1192
1193    @classmethod
1194    def upload_unavailable_result(cls, error_msg, report_path=""):
1195        start_time = int(time.time() * 1000)
1196        Scheduler.upload_case_result((Scheduler.task_name, "Unavailable",
1197                                      error_msg, start_time, start_time,
1198                                      report_path))
1199
1200    @classmethod
1201    def upload_report_end(cls):
1202        if getattr(cls, "tmp_json", None):
1203            os.remove(cls.tmp_json)
1204            del cls.tmp_json
1205        LOG.info("Upload report end")
1206        if Scheduler.proxy is not None:
1207            Scheduler.proxy.report_end()
1208        else:
1209            LOG.debug("There is no proxy, can't upload report end")
1210
1211    @classmethod
1212    def is_module_need_retry(cls, task, module_name):
1213        failed_flag = False
1214        if check_mode(ModeType.decc):
1215            from xdevice import SuiteReporter
1216            for module, _ in SuiteReporter.get_failed_case_list():
1217                if module_name == module or str(module_name).split(
1218                        ".")[0] == module:
1219                    failed_flag = True
1220                    break
1221        else:
1222            from xdevice import ResultReporter
1223            history_report_path = \
1224                getattr(task.config, ConfigConst.history_report_path, "")
1225            params = ResultReporter.get_task_info_params(history_report_path)
1226            if params and params[ReportConst.unsuccessful_params]:
1227                if dict(params[ReportConst.unsuccessful_params]).get(
1228                        module_name, []):
1229                    failed_flag = True
1230                elif dict(params[ReportConst.unsuccessful_params]).get(
1231                        str(module_name).split(".")[0], []):
1232                    failed_flag = True
1233        return failed_flag
1234
1235    @classmethod
1236    def compare_spt_time(cls, kit_spt, device_spt):
1237        if not kit_spt or not device_spt:
1238            return False
1239        try:
1240            kit_time = str(kit_spt).split("-")[:2]
1241            device_time = str(device_spt).split("-")[:2]
1242            k_spt = datetime.datetime.strptime(
1243                "-".join(kit_time), "%Y-%m")
1244            d_spt = datetime.datetime.strptime("-".join(device_time), "%Y-%m")
1245        except ValueError as value_error:
1246            LOG.debug("Date format is error, %s" % value_error.args)
1247            return False
1248        month_interval = int(k_spt.month) - int(d_spt.month)
1249        year_interval = int(k_spt.year) - int(d_spt.year)
1250        LOG.debug("Kit spt (year=%s, month=%s), device spt (year=%s, month=%s)"
1251                  % (k_spt.year, k_spt.month, d_spt.year, d_spt.month))
1252        if year_interval < 0:
1253            return True
1254        if year_interval == 0 and month_interval in range(-11, 3):
1255            return True
1256        if year_interval == 1 and month_interval + 12 in (1, 2):
1257            return True
1258        return False
1259
1260    @classmethod
1261    def _parse_property_value(cls, property_name, driver_request, kit):
1262        test_args = copy.deepcopy(
1263            driver_request.config.get(ConfigConst.testargs, dict()))
1264        property_value = ""
1265        if ConfigConst.pass_through in test_args.keys():
1266            pt_dict = json.loads(test_args.get(ConfigConst.pass_through, ""))
1267            property_value = pt_dict.get(property_name, None)
1268        elif property_name in test_args.keys:
1269            property_value = test_args.get(property_name, None)
1270        return property_value if property_value else \
1271            kit.properties.get(property_name, None)
1272
1273    @classmethod
1274    def _calculate_device_options(cls, device_options, environment_config,
1275                                  options, test_source):
1276        # calculate difference
1277        diff_value = len(environment_config) - len(device_options)
1278        if device_options and diff_value == 0:
1279            return device_options
1280
1281        else:
1282            diff_value = diff_value if diff_value else 1
1283            if str(test_source.source_file).endswith(".bin"):
1284                device_option = DeviceSelectionOption(
1285                    options, DeviceLabelType.ipcamera, test_source)
1286            else:
1287                device_option = DeviceSelectionOption(
1288                    options, None, test_source)
1289
1290            device_option.source_file = \
1291                test_source.source_file or test_source.source_string
1292            device_option.required_manager = "device"
1293            device_options.extend([device_option] * diff_value)
1294            LOG.debug("Assign device options and it's length is %s"
1295                      % len(device_options))
1296        return device_options
1297
1298    @classmethod
1299    def update_test_type_in_source(cls, key, value):
1300        LOG.debug("update test type dict in source")
1301        TestDictSource.test_type[key] = value
1302
1303    @classmethod
1304    def update_ext_type_in_source(cls, key, value):
1305        LOG.debug("update ext type dict in source")
1306        TestDictSource.exe_type[key] = value
1307
1308    @classmethod
1309    def clear_test_dict_source(cls):
1310        TestDictSource.clear()
1311
1312    @classmethod
1313    def reset_test_dict_source(cls):
1314        TestDictSource.reset()
1315
1316    @classmethod
1317    def _pre_component_test(cls, config):
1318        if not config.kits:
1319            return
1320        cur_kit = None
1321        for kit in config.kits:
1322            if kit.__class__.__name__ == CKit.component:
1323                cur_kit = kit
1324                break
1325        if not cur_kit:
1326            return
1327        get_white_list = getattr(cur_kit, "get_white_list", None)
1328        if not callable(get_white_list):
1329            return
1330        subsystems, parts = get_white_list()
1331        if not subsystems and not parts:
1332            return
1333        setattr(config, ConfigConst.component_base_kit, cur_kit)
1334
1335    @classmethod
1336    def component_task_setup(cls, task, module_name):
1337        component_kit = task.config.get(ConfigConst.component_base_kit, None)
1338        if not component_kit:
1339            # only -p -s .you do not care about the components that can be
1340            # supported. you only want to run the use cases of the current
1341            # component
1342            return
1343        LOG.debug("Start component task setup")
1344        _component_mapper = task.config.get(ConfigConst.component_mapper)
1345        _subsystem, _part = _component_mapper.get(module_name)
1346
1347        is_hit = False
1348        # find in cache. if not find, update cache
1349        cache_subsystem, cache_part = component_kit.get_cache()
1350        if _subsystem in cache_subsystem or _part in cache_subsystem:
1351            is_hit = True
1352        if not is_hit:
1353            env_manager = EnvironmentManager()
1354            for _, manager in env_manager.managers.items():
1355                if getattr(manager, "devices_list", []):
1356                    for device in manager.devices_list:
1357                        component_kit.__setup__(device)
1358            cache_subsystem, cache_part = component_kit.get_cache()
1359            if _subsystem in cache_subsystem or _part in cache_subsystem:
1360                is_hit = True
1361        if not is_hit:
1362            LOG.warning("%s are skipped, no suitable component found. "
1363                        "Require subsystem=%s part=%s, no device match this"
1364                        % (module_name, _subsystem, _part))
1365
1366    @classmethod
1367    def construct_repeat_list(cls, task, index):
1368        repeat_list = list()
1369        for driver_index in range(len(task.test_drivers)):
1370            cur_test_driver = copy.deepcopy(task.test_drivers[driver_index])
1371            desc = cur_test_driver[1]
1372            desc.unique_id = '{}_{}'.format(desc.unique_id, index + 1)
1373            repeat_list.append(cur_test_driver)
1374        return repeat_list
1375
1376    @classmethod
1377    def start_auto_retry(cls):
1378        if not Scheduler.is_need_auto_retry:
1379            Scheduler.auto_retry = -1
1380            LOG.debug("No need auto retry")
1381            return
1382        if Scheduler.auto_retry > 0:
1383            Scheduler.auto_retry -= 1
1384            if Scheduler.auto_retry == 0:
1385                Scheduler.auto_retry = -1
1386            from _core.command.console import Console
1387            console = Console()
1388            console.command_parser("run --retry")
1389
1390    @classmethod
1391    def check_auto_retry(cls, options):
1392        if Scheduler.auto_retry < 0 and \
1393                int(getattr(options, ConfigConst.auto_retry, 0)) > 0:
1394            value = int(getattr(options, ConfigConst.auto_retry, 0))
1395            Scheduler.auto_retry = value if value <= 10 else 10
1396
1397    @staticmethod
1398    def call_life_stage_action(**kwargs):
1399        """
1400        call in different lift stage
1401        """
1402        from xdevice import Task
1403        from xdevice import Variables
1404        if Task.life_stage_listener is None:
1405            return
1406        stage = kwargs.get("stage", None)
1407        data = dict()
1408        if stage == LifeStage.task_start:
1409            data = {"type": stage, "name": Variables.task_name}
1410        elif stage == LifeStage.task_end:
1411            task = kwargs.get("task", None)
1412            error = kwargs.get("error", "")
1413            unavailable = kwargs.get("unavailable", 0)
1414            summary_data_report = os.path.join(task.config.report_path,
1415                                               ReportConstant.summary_data_report) if task else ""
1416            if not os.path.exists(summary_data_report):
1417                LOG.error("Call lifecycle error, summary report {} not exists".format(task.config.report_path))
1418                passed = failures = blocked = 0
1419            else:
1420                task_element = ElementTree.parse(summary_data_report).getroot()
1421                total_tests = int(task_element.get(ReportConstant.tests, 0))
1422                failures = int(task_element.get(ReportConstant.failures, 0))
1423                blocked = int(task_element.get(ReportConstant.disabled, 0))
1424                ignored = int(task_element.get(ReportConstant.ignored, 0))
1425                unavailable = int(task_element.get(ReportConstant.unavailable, 0))
1426                passed = total_tests - failures - blocked - ignored
1427            data = {"type": stage, "name": Variables.task_name,
1428                    "passed": passed, "failures": failures,
1429                    "blocked": blocked, "unavailable": unavailable,
1430                    "error": error}
1431        elif stage == LifeStage.case_start:
1432            case_name = kwargs.get("case_name", "")
1433            data = {"type": stage, "name": case_name}
1434        elif stage == LifeStage.case_end:
1435            case_name = kwargs.get("case_name", "")
1436            case_result = kwargs.get("case_result", "")
1437            error_msg = kwargs.get("error_msg", "")
1438            data = {"type": stage, "name": case_name, "case_result": case_result, "error_msg": error_msg}
1439        else:
1440            LOG.error("Call lifecycle error, error param stage: {}".format(stage))
1441            return
1442        Task.life_stage_listener(data)
1443