• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2023 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import copy
20import datetime
21import os
22import queue
23import shutil
24import json
25import threading
26from collections import Counter
27from _core.context.impl import BaseScheduler
28from _core.context.result import ExecuteFinished
29from _core.utils import unique_id
30from _core.utils import check_mode
31from _core.utils import get_filename_extension
32
33from _core.utils import get_cst_time
34from _core.environment.manager_env import EnvironmentManager
35from _core.error import ErrorMessage
36from _core.exception import ExecuteTerminate
37from _core.exception import LiteDeviceError
38from _core.exception import DeviceError
39from _core.executor.request import Request
40from _core.executor.request import Descriptor
41
42from _core.plugin import Config
43from _core.constants import SchedulerType
44from _core.plugin import Plugin
45from _core.constants import TestExecType
46from _core.constants import CKit
47from _core.constants import ModeType
48from _core.constants import DeviceLabelType
49from _core.constants import ConfigConst
50from _core.constants import ReportConst
51from _core.executor.concurrent import DriversThread
52from _core.executor.concurrent import ModuleThread
53from _core.executor.concurrent import ExecuteMessage
54from _core.executor.source import TestSetSource
55from _core.executor.source import find_test_descriptors
56from _core.executor.source import find_testdict_descriptors
57from _core.logger import platform_logger
58from _core.utils import convert_serial
59from _core.report.reporter_helper import ExecInfo
60
61from _core.context.life_stage import TaskStart, StageEvent
62from _core.context.life_stage import TaskEnd
63from _core.context.life_stage import CaseStart
64from _core.context.life_stage import CaseEnd
65from _core.context.upload import Uploader
66from _core.context.option_util import get_device_options
67from _core.context.tdd import TSD
68from _core.exception import ParamError
69from _core.context.handler import report_not_executed
70from _core.context.life_stage import ILifeStageListener
71from _core.variables import Variables
72
73__all__ = ["Scheduler"]
74
75LOG = platform_logger("Scheduler")
76
77
78@Plugin(type=Plugin.SCHEDULER, id=SchedulerType.scheduler)
79class Scheduler(BaseScheduler):
80    """
81    The Scheduler is the main entry point for client code that wishes to
82    discover and execute tests.
83    """
84    lock = threading.Lock()
85    terminate_result = queue.Queue()
86    used_devices = {}
87
88    def _do_execute_(self, task):
89        Scheduler.used_devices.clear()
90        if task.config.exectype == TestExecType.device_test:
91            self._device_test_execute(task)
92        elif task.config.exectype == TestExecType.host_test:
93            self._host_test_execute(task)
94        else:
95            LOG.info("Exec type %s is bypassed" % task.config.exectype)
96
97    def __discover__(self, args):
98        """Discover task to execute"""
99        from _core.executor.request import Task
100        repeat = Variables.config.taskargs.get(ConfigConst.repeat)
101        if not repeat:
102            repeat = args.get(ConfigConst.repeat, 1)
103        args.update({ConfigConst.repeat: int(repeat)})
104        config = Config()
105        config.update(args)
106        task = Task(drivers=[])
107        task.init(config)
108        self.add_life_stage_listener(TaskListener())
109        action = args.get("action", "")
110        task_start = TaskStart(action)
111        self.notify_stage(task_start)
112        root_descriptor = self._find_test_root_descriptor(task.config)
113        task.set_root_descriptor(root_descriptor)
114        return task
115
116    def _device_test_execute(self, task):
117        try:
118            self.run_in_loop(task, run_func=self.run_dynamic_concurrent)
119        finally:
120            Scheduler.__reset_environment__(self.used_devices)
121
122    def _host_test_execute(self, task, ):
123        """Execute host test"""
124        self.run_in_loop(task, run_func=self.run_host_test)
125
126    def run_host_test(self, task, test_drivers, current_driver_threads, message_queue):
127        # get test driver and device
128        test_driver = test_drivers[0]
129
130        # display executing progress
131        self._display_executing_process(None, test_driver,
132                                        test_drivers)
133
134        # start driver thread
135        thread = self._start_driver_thread(current_driver_threads, (
136            None, message_queue, task, test_driver))
137
138        if task.config.scheduler == SchedulerType.synchronize:
139            thread.join()
140
141    def generate_task_report(self, task):
142        task_info = ExecInfo()
143        test_type = getattr(task.config, "testtype", [])
144        task_name = getattr(task.config, "task", "")
145        if task_name:
146            task_info.test_type = str(task_name).upper()
147        else:
148            task_info.test_type = ",".join(test_type) if test_type else "Test"
149        if self.used_devices:
150            serials = []
151            platforms = []
152            test_labels = []
153            for serial, device in self.used_devices.items():
154                serials.append(convert_serial(serial))
155                platform = str(device.test_platform)
156                test_label = str(device.label).capitalize()
157                if platform not in platforms:
158                    platforms.append(platform)
159                if test_label not in test_labels:
160                    test_labels.append(test_label)
161            task_info.device_name = ",".join(serials)
162            task_info.platform = ",".join(platforms)
163            task_info.device_label = ",".join(test_labels)
164        else:
165            task_info.device_name = "None"
166            task_info.platform = "None"
167            task_info.device_label = "None"
168        task_info.repeat = getattr(task.config, ConfigConst.repeat, 1)
169        task_info.test_time = task.config.start_time
170        task_info.product_info = getattr(task, "product_info", "")
171
172        return task_info
173
174    def __allocate_environment__(self, options, test_driver):
175        device_options = get_device_options(options, test_driver[1].source)
176        environment = None
177        env_manager = EnvironmentManager()
178        while True:
179            if not self.is_executing():
180                break
181            if not self.is_monitor_alive():
182                LOG.error("Queue monitor thread is dead.")
183                break
184            environment = env_manager.apply_environment(device_options)
185            applied_device_cnt = len(environment.devices)
186            required_device_cnt = len(device_options)
187            if applied_device_cnt == required_device_cnt:
188                return environment
189            else:
190                env_manager.release_environment(environment)
191                LOG.debug("'%s' is waiting available device",
192                          test_driver[1].source.test_name)
193                if env_manager.check_device_exist(device_options):
194                    continue
195                else:
196                    raise DeviceError(ErrorMessage.Common.Code_0101021.format(required_device_cnt, applied_device_cnt))
197
198        return environment
199
200    @classmethod
201    def __free_environment__(cls, environment):
202        env_manager = EnvironmentManager()
203        env_manager.release_environment(environment)
204
205    @staticmethod
206    def __reset_environment__(used_devices):
207        env_manager = EnvironmentManager()
208        env_manager.reset_environment(used_devices)
209
210    @classmethod
211    def _check_device_spt(cls, kit, driver_request, device):
212        kit_spt = cls._parse_property_value(ConfigConst.spt,
213                                            driver_request, kit)
214        if not kit_spt:
215            setattr(device, ConfigConst.task_state, False)
216            LOG.error("Spt is empty", error_no="00108")
217            return
218        if getattr(driver_request, ConfigConst.product_info, ""):
219            product_info = getattr(driver_request,
220                                   ConfigConst.product_info)
221            if not isinstance(product_info, dict):
222                LOG.warning("Product info should be dict, %s",
223                            product_info)
224                setattr(device, ConfigConst.task_state, False)
225                return
226            device_spt = product_info.get("Security Patch", None)
227            if not device_spt or not \
228                    Scheduler._compare_spt_time(kit_spt, device_spt):
229                LOG.error("The device %s spt is %s, "
230                          "and the test case spt is %s, "
231                          "which does not meet the requirements" %
232                          (device.device_sn, device_spt, kit_spt),
233                          error_no="00116")
234                setattr(device, ConfigConst.task_state, False)
235                return
236
237    def _decc_task_setup(self, environment, task):
238        config = Config()
239        config.update(task.config.__dict__)
240        config.environment = environment
241        driver_request = Request(config=config)
242
243        if environment is None:
244            return False
245
246        for device in environment.devices:
247            if not getattr(device, ConfigConst.need_kit_setup, True):
248                LOG.debug("Device %s need kit setup is false" % device)
249                continue
250
251            # do task setup for device
252            kits_copy = copy.deepcopy(task.config.kits)
253            setattr(device, ConfigConst.task_kits, kits_copy)
254            for kit in getattr(device, ConfigConst.task_kits, []):
255                if not self.is_executing():
256                    break
257                try:
258                    kit.__setup__(device, request=driver_request)
259                except (ParamError, ExecuteTerminate, DeviceError,
260                        LiteDeviceError, ValueError, TypeError,
261                        SyntaxError, AttributeError) as exception:
262                    error_no = getattr(exception, "error_no", "00000")
263                    LOG.exception(
264                        "Task setup device: %s, exception: %s" % (
265                            environment.__get_serial__(),
266                            exception), exc_info=False, error_no=error_no)
267                if kit.__class__.__name__ == CKit.query and \
268                        device.label in [DeviceLabelType.ipcamera]:
269                    self._check_device_spt(kit, driver_request, device)
270            LOG.debug("Set device %s need kit setup to false" % device)
271            setattr(device, ConfigConst.need_kit_setup, False)
272
273        for device in environment.devices:
274            if not getattr(device, ConfigConst.task_state, True):
275                return False
276
277        # set product_info to self.task
278        if getattr(driver_request, ConfigConst.product_info, "") and \
279                not getattr(task, ConfigConst.product_info, ""):
280            product_info = getattr(driver_request, ConfigConst.product_info)
281            if not isinstance(product_info, dict):
282                LOG.warning("Product info should be dict, %s",
283                            product_info)
284            else:
285                setattr(task, ConfigConst.product_info, product_info)
286        return True
287
288    def run_dynamic_concurrent(self, task, test_drivers, current_driver_threads, message_queue):
289        task_unused_env = []
290        test_driver = test_drivers[0]
291        self.notify_stage(CaseStart(test_driver[1].source.module_name, test_driver))
292
293        if getattr(task.config, ConfigConst.history_report_path, ""):
294            module_name = test_driver[1].source.module_name
295            if not self._is_module_need_retry(task, module_name):
296                self._display_executing_process(None, test_driver,
297                                                test_drivers)
298                LOG.info("%s are passed, no need to retry" % module_name)
299                self._append_history_result(task, module_name)
300                LOG.info("")
301                return
302
303        if getattr(task.config, ConfigConst.component_mapper, ""):
304            module_name = test_driver[1].source.module_name
305            self._component_task_setup(task, module_name)
306
307        # get environment
308        try:
309            environment = self.__allocate_environment__(
310                task.config.__dict__, test_driver)
311        except DeviceError as exception:
312            self._handle_device_error(exception, task, test_drivers)
313            self.notify_stage(CaseEnd(test_driver[1].source.module_name, "Failed", exception.args))
314            return
315        if not self.is_executing():
316            if environment:
317                Scheduler.__free_environment__(environment)
318
319        if check_mode(ModeType.decc) or getattr(
320                task.config, ConfigConst.check_device, False):
321            LOG.info("Start to check environment: %s" %
322                     environment.__get_serial__())
323            status = self._decc_task_setup(environment, task)
324            if not status:
325                self.__free_environment__(environment)
326                task_unused_env.append(environment)
327                error_message = "Load Error[00116]"
328                report_not_executed(task.config.report_path, [test_driver],
329                                    error_message, task)
330                return
331
332            else:
333                LOG.info("Environment %s check success",
334                         environment.__get_serial__())
335
336        # display executing progress
337        self._display_executing_process(environment, test_driver,
338                                        test_drivers)
339
340        # add to used devices and set need_kit_setup attribute
341        self._append_used_devices(environment, self.used_devices)
342
343        # start driver thread
344        self._start_driver_thread(current_driver_threads, (
345            environment, message_queue, task, test_driver))
346
347        self._do_taskkit_teardown(self.used_devices, task_unused_env)
348
349    @classmethod
350    def _append_history_result(cls, task, module_name):
351        history_report_path = getattr(
352            task.config, ConfigConst.history_report_path, "")
353        from _core.report.result_reporter import ResultReporter
354        params = ResultReporter.get_task_info_params(
355            history_report_path)
356
357        if not params or not params[ReportConst.data_reports]:
358            LOG.debug("Task info record data reports is empty")
359            return
360
361        report_data_dict = dict(params[ReportConst.data_reports])
362        if module_name not in report_data_dict.keys():
363            module_name_ = str(module_name).split(".")[0]
364            if module_name_ not in report_data_dict.keys():
365                LOG.error("%s not in data reports" % module_name)
366                return
367            module_name = module_name_
368
369        from xdevice import SuiteReporter
370        if check_mode(ModeType.decc):
371            virtual_report_path, report_result = SuiteReporter. \
372                get_history_result_by_module(module_name)
373            LOG.debug("Append history result: (%s, %s)" % (
374                virtual_report_path, report_result))
375            SuiteReporter.append_report_result(
376                (virtual_report_path, report_result))
377        else:
378            history_execute_result = report_data_dict.get(module_name, "")
379            LOG.info("Start copy %s" % history_execute_result)
380            file_name = get_filename_extension(history_execute_result)[0]
381            if os.path.exists(history_execute_result):
382                result_dir = \
383                    os.path.join(task.config.report_path, "result")
384                os.makedirs(result_dir, exist_ok=True)
385                target_execute_result = "%s.xml" % os.path.join(
386                    task.config.report_path, "result", file_name)
387                shutil.copyfile(history_execute_result, target_execute_result)
388                LOG.info("Copy %s to %s" % (
389                    history_execute_result, target_execute_result))
390                if check_mode(ModeType.controller):
391                    request = Request("", task.test_drivers[0][1], "", task.config)
392                    exec_message = ExecuteMessage("", "", "", "")
393                    exec_message.set_result(target_execute_result)
394                    exec_message.set_request(request)
395                    # # 是不是得加上調度已經停止的流程了
396                    Uploader.upload_module_result(exec_message)
397            else:
398                error_msg = "Copy failed! %s not exists!" % \
399                            history_execute_result
400                raise ParamError(error_msg)
401
402    def _handle_device_error(self, exception, task, test_drivers):
403        test_driver = test_drivers[0]
404        self._display_executing_process(None, test_driver, test_drivers)
405        error_message = str(exception)
406        LOG.exception(error_message, exc_info=False, error_no=exception.error_no)
407        report_not_executed(task.config.report_path, [test_driver], error_message, task)
408        test_source = test_driver[1].source
409        case_id = test_source.module_name or test_source.test_name
410        Uploader.upload_unavailable_result(error_message, case_id=case_id)
411        LOG.info("")
412
413    def _start_driver_thread(self, current_driver_threads, thread_params):
414        environment, message_queue, task, test_driver = thread_params
415
416        if task.config.scheduler == SchedulerType.module:
417            driver_thread = ModuleThread(test_driver, task, environment,
418                                         message_queue, self.lock)
419        else:
420            driver_thread = DriversThread(test_driver, task, environment,
421                                          message_queue)
422        thread_name = self._get_thread_name(current_driver_threads)
423        driver_thread.daemon = True
424        driver_thread.name = thread_name
425        driver_thread.set_listeners(self.__create_listeners__(task))
426        driver_thread.start()
427        current_driver_threads.setdefault(thread_name, driver_thread)
428        LOG.info(f"Driver executing in thread {driver_thread.ident}")
429        LOG.info(f"Thread {thread_name} execute started")
430        return driver_thread
431
432    @classmethod
433    def _do_taskkit_teardown(cls, used_devices, task_unused_env):
434        for device in used_devices.values():
435            if getattr(device, ConfigConst.need_kit_setup, True):
436                continue
437
438            for kit in getattr(device, ConfigConst.task_kits, []):
439                try:
440                    kit.__teardown__(device)
441                except Exception as error:
442                    LOG.debug("Do task kit teardown: %s" % error)
443            setattr(device, ConfigConst.task_kits, [])
444            setattr(device, ConfigConst.need_kit_setup, True)
445
446        for environment in task_unused_env:
447            for device in environment.devices:
448                setattr(device, ConfigConst.task_state, True)
449                setattr(device, ConfigConst.need_kit_setup, True)
450
451    def _display_executing_process(self, environment, test_driver,
452                                   test_drivers):
453        source_content = test_driver[1].source.source_file or \
454                         test_driver[1].source.source_string
455        if environment is None:
456            LOG.info("[%d / %d] Executing: %s, Driver: %s" %
457                     (self.test_number - len(test_drivers) + 1,
458                      self.test_number, source_content,
459                      test_driver[1].source.test_type))
460            return
461
462        LOG.info("[%d / %d] Executing: %s, Device: %s, Driver: %s" %
463                 (self.test_number - len(test_drivers) + 1,
464                  self.test_number, source_content,
465                  environment.__get_serial__(),
466                  test_driver[1].source.test_type))
467
468    @classmethod
469    def _get_thread_name(cls, current_driver_threads):
470        thread_id = get_cst_time().strftime('%Y-%m-%d-%H-%M-%S-%f')
471        while thread_id in current_driver_threads.keys():
472            thread_id = get_cst_time().strftime('%Y-%m-%d-%H-%M-%S-%f')
473        return thread_id
474
475    @classmethod
476    def _append_used_devices(cls, environment, used_devices):
477        if environment is not None:
478            for device in environment.devices:
479                device_serial = device.__get_serial__() if device else "None"
480                if device_serial and device_serial not in used_devices.keys():
481                    used_devices[device_serial] = device
482
483    @classmethod
484    def _reset_environment(cls, environment="", config_file=""):
485        env_manager = EnvironmentManager()
486        env_manager.env_stop()
487        EnvironmentManager(environment, config_file)
488
489    @classmethod
490    def _restore_environment(cls):
491        env_manager = EnvironmentManager()
492        env_manager.env_stop()
493        EnvironmentManager()
494
495    def _on_task_error_(self, task, exception: Exception):
496        Uploader.upload_unavailable_result(str(exception.args))
497
498    def _on_execute_finished_(self, task, result: ExecuteFinished):
499        TSD.reset_test_dict_source()
500        if getattr(task.config, ConfigConst.test_environment, "") or \
501                getattr(task.config, ConfigConst.configfile, ""):
502            self._restore_environment()
503
504        self.notify_stage(TaskEnd(task.config.report_path, result.unavailable, result.error_msg))
505        Uploader.upload_task_result(task, result.error_msg)
506        self.upload_report_end()
507
508    @staticmethod
509    def _find_test_root_descriptor(config):
510        if getattr(config, ConfigConst.task, None) or \
511                getattr(config, ConfigConst.testargs, None):
512            Scheduler._pre_component_test(config)
513
514        if getattr(config, ConfigConst.subsystems, "") or \
515                getattr(config, ConfigConst.parts, "") or \
516                getattr(config, ConfigConst.component_base_kit, ""):
517            uid = unique_id("Scheduler", "component")
518            if config.subsystems or config.parts:
519                test_set = (config.subsystems, config.parts)
520            else:
521                kit = getattr(config, ConfigConst.component_base_kit)
522                test_set = kit.get_white_list()
523
524            root = Descriptor(uuid=uid, name="component",
525                              source=TestSetSource(test_set),
526                              container=True)
527
528            root.children = find_test_descriptors(config)
529            return root
530            # read test list from testdict
531        if getattr(config, ConfigConst.testdict, "") != "" and getattr(
532                config, ConfigConst.testfile, "") == "":
533            uid = unique_id("Scheduler", "testdict")
534            root = Descriptor(uuid=uid, name="testdict",
535                              source=TestSetSource(config.testdict),
536                              container=True)
537            root.children = find_testdict_descriptors(config)
538            return root
539
540            # read test list from testfile, testlist or task
541        test_set = getattr(config, ConfigConst.testfile, "") or getattr(
542            config, ConfigConst.testlist, "") or getattr(
543            config, ConfigConst.task, "") or getattr(
544            config, ConfigConst.testcase)
545        if test_set:
546            fname, _ = get_filename_extension(test_set)
547            uid = unique_id("Scheduler", fname)
548            root = Descriptor(uuid=uid, name=fname,
549                              source=TestSetSource(test_set), container=True)
550            if config.scheduler == SchedulerType.module:
551                Scheduler._find_children_module(root, config)
552            else:
553                Scheduler._find_children_default(root, config)
554            return root
555        else:
556            raise ParamError(ErrorMessage.Common.Code_0101022)
557
558    @staticmethod
559    def _find_children_default(root, config):
560        root.children = find_test_descriptors(config)
561
562    @staticmethod
563    def _find_children_module(root, config):
564        desc = find_test_descriptors(config)
565        common_kits = {}
566        task_list = {}
567        all_data = {}
568        for i in desc:
569            module_subsystem = i.source.module_subsystem
570            if module_subsystem not in task_list:
571                task_list.update({module_subsystem: 1})
572            else:
573                task_list.update({module_subsystem: task_list.get(module_subsystem) + 1})
574            if module_subsystem not in all_data:
575                all_data[module_subsystem] = {
576                    "run-command": [],
577                    'pre-push': [],
578                    "push": []
579                }
580            kits = Scheduler._get_test_kits(i.source.config_file)
581            for kit in kits:
582                if kit.get("type") == "AppInstallKit":
583                    continue
584                if kit.get("type") == "ShellKit":
585                    shell_kit = kit.get("run-command", [])
586                    for command in shell_kit:
587                        if "remount" in command or "mkdir" in command:
588                            all_data[module_subsystem].get("run-command").append(command)
589                if kit.get("type") == "PushKit":
590                    pre_push_kit = kit.get("pre-push", [])
591                    for command in pre_push_kit:
592                        if "remount" in command or "mkdir" in command:
593                            all_data[module_subsystem].get("pre-push").append(command)
594                    push_kit = kit.get("push", [])
595                    all_data[module_subsystem].get("push").extend(push_kit)
596        for _, value in all_data.items():
597            for key1, value1 in value.items():
598                common = value1
599                count = Counter(common)
600                new_common = [k for k, v in count.items() if v > 1]
601                value[key1] = new_common
602        for key, value in all_data.items():
603            common_kit_tem = [{
604                'type': 'ShellKit',
605                'run-command': value.get("run-command", [])
606            }, {
607                'type': 'CommonPushKit',
608                'pre-push': value.get("pre-push", []),
609                "push": value.get("push", [])
610            }]
611            common_kits.update({key: common_kit_tem})
612        LOG.debug(common_kits)
613        LOG.debug(task_list)
614        root.children = sorted(desc, key=lambda x: x.source.module_subsystem)
615        setattr(root, "common_kits", common_kits)
616        setattr(root, "task_list", task_list)
617
618    @staticmethod
619    def _get_test_kits(test_source):
620        try:
621            from _core.testkit.json_parser import JsonParser
622            json_config = JsonParser(test_source)
623            return json_config.get_kits()
624        except ParamError as error:
625            LOG.error(error, error_no=error.error_no)
626            return ""
627
628    @classmethod
629    def _terminate(cls):
630        LOG.info("Start to terminate execution")
631        return Scheduler.terminate_result.get()
632
633    @classmethod
634    def upload_report_end(cls):
635        if getattr(cls, "tmp_json", None):
636            os.remove(cls.tmp_json)
637            del cls.tmp_json
638        Uploader.upload_report_end()
639
640    @classmethod
641    def _is_module_need_retry(cls, task, module_name):
642        failed_flag = False
643        if check_mode(ModeType.decc):
644            from xdevice import SuiteReporter
645            for module, _ in SuiteReporter.get_failed_case_list():
646                if module_name == module or str(module_name).split(
647                        ".")[0] == module:
648                    failed_flag = True
649                    break
650        else:
651            from xdevice import ResultReporter
652            history_report_path = \
653                getattr(task.config, ConfigConst.history_report_path, "")
654            params = ResultReporter.get_task_info_params(history_report_path)
655            if params and params[ReportConst.unsuccessful_params]:
656                if dict(params[ReportConst.unsuccessful_params]).get(
657                        module_name, []):
658                    failed_flag = True
659                elif dict(params[ReportConst.unsuccessful_params]).get(
660                        str(module_name).split(".")[0], []):
661                    failed_flag = True
662        return failed_flag
663
664    @classmethod
665    def _compare_spt_time(cls, kit_spt, device_spt):
666        if not kit_spt or not device_spt:
667            return False
668        try:
669            kit_time = str(kit_spt).split("-")[:2]
670            device_time = str(device_spt).split("-")[:2]
671            k_spt = datetime.datetime.strptime(
672                "-".join(kit_time), "%Y-%m")
673            d_spt = datetime.datetime.strptime("-".join(device_time), "%Y-%m")
674        except ValueError as value_error:
675            LOG.debug("Date format is error, %s" % value_error.args)
676            return False
677        month_interval = int(k_spt.month) - int(d_spt.month)
678        year_interval = int(k_spt.year) - int(d_spt.year)
679        LOG.debug("Kit spt (year=%s, month=%s), device spt (year=%s, month=%s)"
680                  % (k_spt.year, k_spt.month, d_spt.year, d_spt.month))
681        if year_interval < 0:
682            return True
683        if year_interval == 0 and month_interval in range(-11, 3):
684            return True
685        if year_interval == 1 and month_interval + 12 in (1, 2):
686            return True
687        return False
688
689    @classmethod
690    def _parse_property_value(cls, property_name, driver_request, kit):
691        test_args = copy.deepcopy(
692            driver_request.config.get(ConfigConst.testargs, dict()))
693        property_value = ""
694        if ConfigConst.pass_through in test_args.keys():
695            pt_dict = json.loads(test_args.get(ConfigConst.pass_through, ""))
696            property_value = pt_dict.get(property_name, None)
697        elif property_name in test_args.keys:
698            property_value = test_args.get(property_name, None)
699        return property_value if property_value else \
700            kit.properties.get(property_name, None)
701
702    @classmethod
703    def _pre_component_test(cls, config):
704        if not config.kits:
705            return
706        cur_kit = None
707        for kit in config.kits:
708            if kit.__class__.__name__ == CKit.component:
709                cur_kit = kit
710                break
711        if not cur_kit:
712            return
713        get_white_list = getattr(cur_kit, "get_white_list", None)
714        if not callable(get_white_list):
715            return
716        subsystems, parts = get_white_list()
717        if not subsystems and not parts:
718            return
719        setattr(config, ConfigConst.component_base_kit, cur_kit)
720
721    @classmethod
722    def _component_task_setup(cls, task, module_name):
723        component_kit = task.config.get(ConfigConst.component_base_kit, None)
724        if not component_kit:
725            # only -p -s .you do not care about the components that can be
726            # supported. you only want to run the use cases of the current
727            # component
728            return
729        LOG.debug("Start component task setup")
730        _component_mapper = task.config.get(ConfigConst.component_mapper)
731        _subsystem, _part = _component_mapper.get(module_name)
732
733        is_hit = False
734        # find in cache. if not find, update cache
735        cache_subsystem, cache_part = component_kit.get_cache()
736        if _subsystem in cache_subsystem or _part in cache_subsystem:
737            is_hit = True
738        if not is_hit:
739            env_manager = EnvironmentManager()
740            for _, manager in env_manager.managers.items():
741                if getattr(manager, "devices_list", []):
742                    for device in manager.devices_list:
743                        component_kit.__setup__(device)
744            cache_subsystem, cache_part = component_kit.get_cache()
745            if _subsystem in cache_subsystem or _part in cache_subsystem:
746                is_hit = True
747        if not is_hit:
748            LOG.warning("%s are skipped, no suitable component found. "
749                        "Require subsystem=%s part=%s, no device match this"
750                        % (module_name, _subsystem, _part))
751
752
753class TaskListener(ILifeStageListener):
754    def __on_event__(self, stage_event: StageEvent):
755        from xdevice import Task
756        if Task.life_stage_listener:
757            data = stage_event.get_data()
758            Task.life_stage_listener(data)
759