• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import copy
20import datetime
21import os
22import queue
23import time
24import uuid
25import shutil
26from xml.etree import ElementTree
27
28from _core.utils import unique_id
29from _core.utils import check_mode
30from _core.utils import get_sub_path
31from _core.utils import get_filename_extension
32from _core.utils import convert_serial
33from _core.utils import get_instance_name
34from _core.utils import is_config_str
35from _core.utils import check_result_report
36from _core.utils import get_cst_time
37from _core.environment.manager_env import EnvironmentManager
38from _core.environment.manager_env import DeviceSelectionOption
39from _core.exception import ParamError
40from _core.exception import ExecuteTerminate
41from _core.exception import LiteDeviceError
42from _core.exception import DeviceError
43from _core.interface import LifeCycle
44from _core.executor.request import Request
45from _core.executor.request import Task
46from _core.executor.request import Descriptor
47from _core.plugin import get_plugin
48from _core.plugin import Plugin
49from _core.plugin import Config
50from _core.report.reporter_helper import ExecInfo
51from _core.report.reporter_helper import ReportConstant
52from _core.report.reporter_helper import Case
53from _core.report.reporter_helper import DataHelper
54from _core.constants import TestExecType
55from _core.constants import CKit
56from _core.constants import ModeType
57from _core.constants import DeviceLabelType
58from _core.constants import SchedulerType
59from _core.constants import ListenerType
60from _core.constants import ConfigConst
61from _core.constants import ReportConst
62from _core.constants import HostDrivenTestType
63from _core.executor.concurrent import DriversThread
64from _core.executor.concurrent import QueueMonitorThread
65from _core.executor.concurrent import DriversDryRunThread
66from _core.executor.source import TestSetSource
67from _core.executor.source import find_test_descriptors
68from _core.executor.source import find_testdict_descriptors
69from _core.executor.source import TestDictSource
70from _core.logger import platform_logger
71from _core.logger import add_task_file_handler
72from _core.logger import remove_task_file_handler
73from _core.logger import add_encrypt_file_handler
74from _core.logger import remove_encrypt_file_handler
75
76__all__ = ["Scheduler"]
77LOG = platform_logger("Scheduler")
78
79MAX_VISIBLE_LENGTH = 150
80
81
82@Plugin(type=Plugin.SCHEDULER, id=SchedulerType.scheduler)
83class Scheduler(object):
84    """
85    The Scheduler is the main entry point for client code that wishes to
86    discover and execute tests.
87    """
88    # factory params
89    is_execute = True
90    terminate_result = queue.Queue()
91    upload_address = ""
92    task_type = ""
93    task_name = ""
94    mode = ""
95    proxy = None
96
97    # command_queue to store test commands
98    command_queue = []
99    max_command_num = 50
100    # the number of tests in current task
101    test_number = 0
102    device_labels = []
103    auto_retry = -1
104    is_need_auto_retry = False
105
106    def __discover__(self, args):
107        """Discover task to execute"""
108        config = Config()
109        config.update(args)
110        task = Task(drivers=[])
111        task.init(config)
112
113        root_descriptor = self._find_test_root_descriptor(task.config)
114        task.set_root_descriptor(root_descriptor)
115        return task
116
117    def __execute__(self, task):
118        error_message = ""
119        try:
120            Scheduler.is_execute = True
121            if Scheduler.command_queue:
122                LOG.debug("Run command: %s" % Scheduler.command_queue[-1])
123                run_command = Scheduler.command_queue.pop()
124                task_id = str(uuid.uuid1()).split("-")[0]
125                Scheduler.command_queue.append((task_id, run_command,
126                                                task.config.report_path))
127                if len(Scheduler.command_queue) > self.max_command_num:
128                    Scheduler.command_queue.pop(0)
129
130            if getattr(task.config, ConfigConst.test_environment, ""):
131                self._reset_environment(task.config.get(
132                    ConfigConst.test_environment, ""))
133            elif getattr(task.config, ConfigConst.configfile, ""):
134                self._reset_environment(config_file=task.config.get(
135                    ConfigConst.configfile, ""))
136
137            # do with the count of repeat about a task
138            if getattr(task.config, ConfigConst.repeat, 0) > 0:
139                drivers_list = list()
140                for repeat_index in range(task.config.repeat):
141                    for driver_index in range(len(task.test_drivers)):
142                        drivers_list.append(
143                            copy.deepcopy(task.test_drivers[driver_index]))
144                task.test_drivers = drivers_list
145
146            self.test_number = len(task.test_drivers)
147
148            if task.config.exectype == TestExecType.device_test:
149                self._device_test_execute(task)
150            elif task.config.exectype == TestExecType.host_test:
151                self._host_test_execute(task)
152            else:
153                LOG.info("Exec type %s is bypassed" % task.config.exectype)
154
155        except (ParamError, ValueError, TypeError, SyntaxError, AttributeError,
156                DeviceError, LiteDeviceError, ExecuteTerminate) as exception:
157            error_no = getattr(exception, "error_no", "")
158            error_message = "%s[%s]" % (str(exception), error_no) \
159                if error_no else str(exception)
160            error_no = error_no if error_no else "00000"
161            LOG.exception(exception, exc_info=False, error_no=error_no)
162
163        finally:
164            Scheduler.reset_test_dict_source()
165            if getattr(task.config, ConfigConst.test_environment, "") or \
166                    getattr(task.config, ConfigConst.configfile, ""):
167                self._restore_environment()
168
169            if Scheduler.upload_address:
170                Scheduler.upload_task_result(task, error_message)
171                Scheduler.upload_report_end()
172
173    def _device_test_execute(self, task):
174        used_devices = {}
175        try:
176            self._dynamic_concurrent_execute(task, used_devices)
177        finally:
178            Scheduler.__reset_environment__(used_devices)
179            # generate reports
180            self._generate_task_report(task, used_devices)
181
182    def _host_test_execute(self, task):
183        """Execute host test"""
184        try:
185            # initial params
186            current_driver_threads = {}
187            test_drivers = task.test_drivers
188            message_queue = queue.Queue()
189
190            # execute test drivers
191            queue_monitor_thread = self._start_queue_monitor(
192                message_queue, test_drivers, current_driver_threads)
193            while test_drivers:
194                if len(current_driver_threads) > 5:
195                    time.sleep(3)
196                    continue
197
198                # clear remaining test drivers when scheduler is terminated
199                if not Scheduler.is_execute:
200                    LOG.info("Clear test drivers")
201                    self._clear_not_executed(task, test_drivers)
202                    break
203
204                # get test driver and device
205                test_driver = test_drivers[0]
206
207                # display executing progress
208                self._display_executing_process(None, test_driver,
209                                                test_drivers)
210
211                # start driver thread
212                self._start_driver_thread(current_driver_threads, (
213                    None, message_queue, task, test_driver))
214                test_drivers.pop(0)
215
216            # wait for all drivers threads finished and do kit teardown
217            while True:
218                if not queue_monitor_thread.is_alive():
219                    break
220                time.sleep(3)
221
222        finally:
223            # generate reports
224            self._generate_task_report(task)
225
226    def _dry_run_device_test_execute(self, task):
227        try:
228            # initial params
229            used_devices = {}
230            current_driver_threads = {}
231            test_drivers = task.test_drivers
232            message_queue = queue.Queue()
233            task_unused_env = []
234
235            # execute test drivers
236            queue_monitor_thread = self._start_queue_monitor(
237                message_queue, test_drivers, current_driver_threads)
238            while test_drivers:
239                # clear remaining test drivers when scheduler is terminated
240                if not Scheduler.is_execute:
241                    LOG.info("Clear test drivers")
242                    self._clear_not_executed(task, test_drivers)
243                    break
244
245                # get test driver and device
246                test_driver = test_drivers[0]
247                # get environment
248                try:
249                    environment = self.__allocate_environment__(
250                        task.config.__dict__, test_driver)
251                except DeviceError as exception:
252                    self._handle_device_error(exception, task, test_drivers)
253                    continue
254
255                if not Scheduler.is_execute:
256                    if environment:
257                        Scheduler.__free_environment__(environment)
258                    continue
259
260                # start driver thread
261                thread_id = self._get_thread_id(current_driver_threads)
262                driver_thread = DriversDryRunThread(test_driver, task, environment,
263                                              message_queue)
264                driver_thread.setDaemon(True)
265                driver_thread.set_thread_id(thread_id)
266                driver_thread.start()
267                current_driver_threads.setdefault(thread_id, driver_thread)
268
269                test_drivers.pop(0)
270
271            # wait for all drivers threads finished and do kit teardown
272            while True:
273                if not queue_monitor_thread.is_alive():
274                    break
275                time.sleep(3)
276
277            self._do_taskkit_teardown(used_devices, task_unused_env)
278        finally:
279            LOG.debug("Removing report_path: {}".format(task.config.report_path))
280            # delete reports
281            self.stop_task_logcat()
282            self.stop_encrypt_log()
283            shutil.rmtree(task.config.report_path)
284
285    def _generate_task_report(self, task, used_devices=None):
286        task_info = ExecInfo()
287        test_type = getattr(task.config, "testtype", [])
288        task_name = getattr(task.config, "task", "")
289        if task_name:
290            task_info.test_type = str(task_name).upper()
291        else:
292            task_info.test_type = ",".join(test_type) if test_type else "Test"
293        if used_devices:
294            serials = []
295            platforms = []
296            for serial, device in used_devices.items():
297                serials.append(convert_serial(serial))
298                platform = str(device.label).capitalize()
299                if platform not in platforms:
300                    platforms.append(platform)
301            task_info.device_name = ",".join(serials)
302            task_info.platform = ",".join(platforms)
303        else:
304            task_info.device_name = "None"
305            task_info.platform = "None"
306        task_info.test_time = task.config.start_time
307        task_info.product_info = getattr(task, "product_info", "")
308
309        listeners = self._create_listeners(task)
310        for listener in listeners:
311            listener.__ended__(LifeCycle.TestTask, task_info,
312                               test_type=task_info.test_type)
313
314    @classmethod
315    def _create_listeners(cls, task):
316        listeners = []
317        # append log listeners
318        log_listeners = get_plugin(Plugin.LISTENER, ListenerType.log)
319        for log_listener in log_listeners:
320            log_listener_instance = log_listener.__class__()
321            listeners.append(log_listener_instance)
322        # append report listeners
323        report_listeners = get_plugin(Plugin.LISTENER, ListenerType.report)
324        for report_listener in report_listeners:
325            report_listener_instance = report_listener.__class__()
326            setattr(report_listener_instance, "report_path",
327                    task.config.report_path)
328            listeners.append(report_listener_instance)
329        # append upload listeners
330        upload_listeners = get_plugin(Plugin.LISTENER, ListenerType.upload)
331        for upload_listener in upload_listeners:
332            upload_listener_instance = upload_listener.__class__()
333            listeners.append(upload_listener_instance)
334        return listeners
335
336    @staticmethod
337    def _find_device_options(environment_config, options, test_source):
338        devices_option = []
339        index = 1
340        for device_dict in environment_config:
341            label = device_dict.get("label", "")
342            required_manager = device_dict.get("type", "device")
343            required_manager = \
344                required_manager if required_manager else "device"
345            if not label:
346                continue
347            device_option = DeviceSelectionOption(options, label, test_source)
348            device_dict.pop("type", None)
349            device_dict.pop("label", None)
350            device_option.required_manager = required_manager
351            device_option.extend_value = device_dict
352            device_option.source_file = \
353                test_source.config_file or test_source.source_string
354            if hasattr(device_option, "env_index"):
355                device_option.env_index = index
356            index += 1
357            devices_option.append(device_option)
358        return devices_option
359
360    def __allocate_environment__(self, options, test_driver):
361        device_options = self.get_device_options(options,
362                                                 test_driver[1].source)
363        environment = None
364        env_manager = EnvironmentManager()
365        while True:
366            if not Scheduler.is_execute:
367                break
368            environment = env_manager.apply_environment(device_options)
369            if len(environment.devices) == len(device_options):
370                return environment
371            else:
372                env_manager.release_environment(environment)
373                LOG.debug("'%s' is waiting available device",
374                          test_driver[1].source.test_name)
375                if env_manager.check_device_exist(device_options):
376                    continue
377                else:
378                    LOG.debug("'%s' required %s devices, actually %s devices"
379                              " were found" % (test_driver[1].source.test_name,
380                                               len(device_options),
381                                               len(environment.devices)))
382                    raise DeviceError("The '%s' required device does not exist"
383                                      % test_driver[1].source.source_file,
384                                      error_no="00104")
385
386        return environment
387
388    @classmethod
389    def get_device_options(cls, options, test_source):
390        device_options = []
391        config_file = test_source.config_file
392        environment_config = []
393        from _core.testkit.json_parser import JsonParser
394        if test_source.source_string and is_config_str(
395                test_source.source_string):
396            json_config = JsonParser(test_source.source_string)
397            environment_config = json_config.get_environment()
398            device_options = cls._find_device_options(
399                environment_config, options, test_source)
400        elif config_file and os.path.exists(config_file):
401            json_config = JsonParser(test_source.config_file)
402            environment_config = json_config.get_environment()
403            device_options = cls._find_device_options(
404                environment_config, options, test_source)
405
406        device_options = cls._calculate_device_options(
407            device_options, environment_config, options, test_source)
408
409        if ConfigConst.component_mapper in options.keys():
410            required_component = options.get(ConfigConst.component_mapper). \
411                get(test_source.module_name, None)
412            for device_option in device_options:
413                device_option.required_component = required_component
414        return device_options
415
416    @staticmethod
417    def __free_environment__(environment):
418        env_manager = EnvironmentManager()
419        env_manager.release_environment(environment)
420
421    @staticmethod
422    def __reset_environment__(used_devices):
423        env_manager = EnvironmentManager()
424        env_manager.reset_environment(used_devices)
425
426    @classmethod
427    def _check_device_spt(cls, kit, driver_request, device):
428        kit_spt = cls._parse_property_value(ConfigConst.spt,
429                                            driver_request, kit)
430        if not kit_spt:
431            setattr(device, ConfigConst.task_state, False)
432            LOG.error("Spt is empty", error_no="00108")
433            return
434        if getattr(driver_request, ConfigConst.product_info, ""):
435            product_info = getattr(driver_request,
436                                   ConfigConst.product_info)
437            if not isinstance(product_info, dict):
438                LOG.warning("Product info should be dict, %s",
439                            product_info)
440                setattr(device, ConfigConst.task_state, False)
441                return
442            device_spt = product_info.get("Security Patch", None)
443            if not device_spt or not \
444                    Scheduler.compare_spt_time(kit_spt, device_spt):
445                LOG.error("The device %s spt is %s, "
446                          "and the test case spt is %s, "
447                          "which does not meet the requirements" %
448                          (device.device_sn, device_spt, kit_spt),
449                          error_no="00116")
450                setattr(device, ConfigConst.task_state, False)
451                return
452
453    def _decc_task_setup(self, environment, task):
454        config = Config()
455        config.update(task.config.__dict__)
456        config.environment = environment
457        driver_request = Request(config=config)
458
459        if environment is None:
460            return False
461
462        for device in environment.devices:
463            if not getattr(device, ConfigConst.need_kit_setup, True):
464                LOG.debug("Device %s need kit setup is false" % device)
465                continue
466
467            # do task setup for device
468            kits_copy = copy.deepcopy(task.config.kits)
469            setattr(device, ConfigConst.task_kits, kits_copy)
470            for kit in getattr(device, ConfigConst.task_kits, []):
471                if not Scheduler.is_execute:
472                    break
473                try:
474                    kit.__setup__(device, request=driver_request)
475                except (ParamError, ExecuteTerminate, DeviceError,
476                        LiteDeviceError, ValueError, TypeError,
477                        SyntaxError, AttributeError) as exception:
478                    error_no = getattr(exception, "error_no", "00000")
479                    LOG.exception(
480                        "Task setup device: %s, exception: %s" % (
481                            environment.__get_serial__(),
482                            exception), exc_info=False, error_no=error_no)
483                if kit.__class__.__name__ == CKit.query and \
484                        device.label in [DeviceLabelType.ipcamera]:
485                    self._check_device_spt(kit, driver_request, device)
486            LOG.debug("Set device %s need kit setup to false" % device)
487            setattr(device, ConfigConst.need_kit_setup, False)
488
489        for device in environment.devices:
490            if not getattr(device, ConfigConst.task_state, True):
491                return False
492
493        # set product_info to self.task
494        if getattr(driver_request, ConfigConst.product_info, "") and \
495                not getattr(task, ConfigConst.product_info, ""):
496            product_info = getattr(driver_request, ConfigConst.product_info)
497            if not isinstance(product_info, dict):
498                LOG.warning("Product info should be dict, %s",
499                            product_info)
500            else:
501                setattr(task, ConfigConst.product_info, product_info)
502        return True
503
504    def _dynamic_concurrent_execute(self, task, used_devices):
505        # initial params
506        current_driver_threads = {}
507        test_drivers = task.test_drivers
508        message_queue = queue.Queue()
509        task_unused_env = []
510
511        # execute test drivers
512        queue_monitor_thread = self._start_queue_monitor(
513            message_queue, test_drivers, current_driver_threads)
514        while test_drivers:
515            # clear remaining test drivers when scheduler is terminated
516            if not Scheduler.is_execute:
517                LOG.info("Clear test drivers")
518                self._clear_not_executed(task, test_drivers)
519                break
520
521            # get test driver and device
522            test_driver = test_drivers[0]
523
524            if getattr(task.config, ConfigConst.history_report_path, ""):
525                module_name = test_driver[1].source.module_name
526                if not self.is_module_need_retry(task, module_name):
527                    self._display_executing_process(None, test_driver,
528                                                    test_drivers)
529                    LOG.info("%s are passed, no need to retry" % module_name)
530                    self._append_history_result(task, module_name)
531                    LOG.info("")
532                    test_drivers.pop(0)
533                    continue
534
535            if getattr(task.config, ConfigConst.component_mapper, ""):
536                module_name = test_driver[1].source.module_name
537                self.component_task_setup(task, module_name)
538
539            # get environment
540            try:
541                environment = self.__allocate_environment__(
542                    task.config.__dict__, test_driver)
543            except DeviceError as exception:
544                self._handle_device_error(exception, task, test_drivers)
545                continue
546
547            if not Scheduler.is_execute:
548                if environment:
549                    Scheduler.__free_environment__(environment)
550                continue
551
552            if check_mode(ModeType.decc) or getattr(
553                    task.config, ConfigConst.check_device, False):
554                LOG.info("Start to check environment: %s" %
555                         environment.__get_serial__())
556                status = self._decc_task_setup(environment, task)
557                if not status:
558                    Scheduler.__free_environment__(environment)
559                    task_unused_env.append(environment)
560                    error_message = "Load Error[00116]"
561                    self.report_not_executed(task.config.report_path,
562                                             [test_drivers[0]],
563                                             error_message, task)
564                    test_drivers.pop(0)
565                    continue
566                else:
567                    LOG.info("Environment %s check success",
568                             environment.__get_serial__())
569
570            # display executing progress
571            self._display_executing_process(environment, test_driver,
572                                            test_drivers)
573
574            # add to used devices and set need_kit_setup attribute
575            self._append_used_devices(environment, used_devices)
576
577            # start driver thread
578            self._start_driver_thread(current_driver_threads, (
579                environment, message_queue, task, test_driver))
580            test_drivers.pop(0)
581
582        # wait for all drivers threads finished and do kit teardown
583        while True:
584            if not queue_monitor_thread.is_alive():
585                break
586            time.sleep(3)
587
588        self._do_taskkit_teardown(used_devices, task_unused_env)
589
590    @classmethod
591    def _append_history_result(cls, task, module_name):
592        history_report_path = getattr(
593            task.config, ConfigConst.history_report_path, "")
594        from _core.report.result_reporter import ResultReporter
595        params = ResultReporter.get_task_info_params(
596            history_report_path)
597
598        if not params or not params[ReportConst.data_reports]:
599            LOG.debug("Task info record data reports is empty")
600            return
601
602        report_data_dict = dict(params[ReportConst.data_reports])
603        if module_name not in report_data_dict.keys():
604            module_name_ = str(module_name).split(".")[0]
605            if module_name_ not in report_data_dict.keys():
606                LOG.error("%s not in data reports" % module_name)
607                return
608            module_name = module_name_
609
610        from xdevice import SuiteReporter
611        if check_mode(ModeType.decc):
612            virtual_report_path, report_result = SuiteReporter. \
613                get_history_result_by_module(module_name)
614            LOG.debug("Append history result: (%s, %s)" % (
615                virtual_report_path, report_result))
616            SuiteReporter.append_report_result(
617                (virtual_report_path, report_result))
618        else:
619            history_execute_result = report_data_dict.get(module_name, "")
620            LOG.info("Start copy %s" % history_execute_result)
621            file_name = get_filename_extension(history_execute_result)[0]
622            if os.path.exists(history_execute_result):
623                result_dir = \
624                    os.path.join(task.config.report_path, "result")
625                os.makedirs(result_dir, exist_ok=True)
626                target_execute_result = "%s.xml" % os.path.join(
627                    task.config.report_path, "result", file_name)
628                shutil.copyfile(history_execute_result, target_execute_result)
629                LOG.info("Copy %s to %s" % (
630                    history_execute_result, target_execute_result))
631            else:
632                error_msg = "Copy failed! %s not exists!" % \
633                            history_execute_result
634                raise ParamError(error_msg)
635
636    def _handle_device_error(self, exception, task, test_drivers):
637        self._display_executing_process(None, test_drivers[0], test_drivers)
638        error_message = "%s: %s" % \
639                        (get_instance_name(exception), exception)
640        LOG.exception(error_message, exc_info=False,
641                      error_no=exception.error_no)
642        if check_mode(ModeType.decc):
643            error_message = "Load Error[00104]"
644        self.report_not_executed(task.config.report_path, [test_drivers[0]],
645                                 error_message, task)
646
647        LOG.info("")
648        test_drivers.pop(0)
649
650    @classmethod
651    def _clear_not_executed(cls, task, test_drivers):
652        if Scheduler.mode != ModeType.decc:
653            # clear all
654            test_drivers.clear()
655            return
656        # The result is reported only in DECC mode, and also clear all.
657        LOG.error("Case no run: task execution terminated!", error_no="00300")
658        error_message = "Execute Terminate[00300]"
659        cls.report_not_executed(task.config.report_path, test_drivers,
660                                error_message)
661        test_drivers.clear()
662
663    @classmethod
664    def report_not_executed(cls, report_path, test_drivers, error_message,
665                            task=None):
666        # traversing list to get remained elements
667        for test_driver in test_drivers:
668            # get report file
669            if task and getattr(task.config, "testdict", ""):
670                report_file = os.path.join(get_sub_path(
671                    test_driver[1].source.source_file),
672                    "%s.xml" % test_driver[1].source.test_name)
673            else:
674                report_file = os.path.join(
675                    report_path, "result",
676                    "%s.xml" % test_driver[1].source.module_name)
677
678            # get report name
679            report_name = test_driver[1].source.test_name if \
680                not test_driver[1].source.test_name.startswith("{") \
681                else "report"
682
683            # get module name
684            module_name = test_driver[1].source.module_name
685
686            # here, normally create empty report and then upload result
687            check_result_report(report_path, report_file, error_message,
688                                report_name, module_name)
689
690    def _start_driver_thread(self, current_driver_threads, thread_params):
691        environment, message_queue, task, test_driver = thread_params
692        thread_id = self._get_thread_id(current_driver_threads)
693        driver_thread = DriversThread(test_driver, task, environment,
694                                      message_queue)
695        driver_thread.setDaemon(True)
696        driver_thread.set_thread_id(thread_id)
697        driver_thread.set_listeners(self._create_listeners(task))
698        driver_thread.start()
699        current_driver_threads.setdefault(thread_id, driver_thread)
700
701    @classmethod
702    def _do_taskkit_teardown(cls, used_devices, task_unused_env):
703        for device in used_devices.values():
704            if getattr(device, ConfigConst.need_kit_setup, True):
705                continue
706
707            for kit in getattr(device, ConfigConst.task_kits, []):
708                try:
709                    kit.__teardown__(device)
710                except Exception as error:
711                    LOG.debug("Do task kit teardown: %s" % error)
712            setattr(device, ConfigConst.task_kits, [])
713            setattr(device, ConfigConst.need_kit_setup, True)
714
715        for environment in task_unused_env:
716            for device in environment.devices:
717                setattr(device, ConfigConst.task_state, True)
718                setattr(device, ConfigConst.need_kit_setup, True)
719
720    def _display_executing_process(self, environment, test_driver,
721                                   test_drivers):
722        source_content = test_driver[1].source.source_file or \
723                         test_driver[1].source.source_string
724        if environment is None:
725            LOG.info("[%d / %d] Executing: %s, Driver: %s" %
726                     (self.test_number - len(test_drivers) + 1,
727                      self.test_number, source_content,
728                      test_driver[1].source.test_type))
729            return
730
731        LOG.info("[%d / %d] Executing: %s, Device: %s, Driver: %s" %
732                 (self.test_number - len(test_drivers) + 1,
733                  self.test_number, source_content,
734                  environment.__get_serial__(),
735                  test_driver[1].source.test_type))
736
737    @classmethod
738    def _get_thread_id(cls, current_driver_threads):
739        thread_id = get_cst_time().strftime(
740            '%Y-%m-%d-%H-%M-%S-%f')
741        while thread_id in current_driver_threads.keys():
742            thread_id = get_cst_time().strftime(
743                '%Y-%m-%d-%H-%M-%S-%f')
744        return thread_id
745
746    @classmethod
747    def _append_used_devices(cls, environment, used_devices):
748        if environment is not None:
749            for device in environment.devices:
750                device_serial = device.__get_serial__() if device else "None"
751                if device_serial and device_serial not in used_devices.keys():
752                    used_devices[device_serial] = device
753
754    @staticmethod
755    def _start_queue_monitor(message_queue, test_drivers,
756                             current_driver_threads):
757        queue_monitor_thread = QueueMonitorThread(message_queue,
758                                                  current_driver_threads,
759                                                  test_drivers)
760        queue_monitor_thread.setDaemon(True)
761        queue_monitor_thread.start()
762        return queue_monitor_thread
763
764    def exec_command(self, command, options):
765        """
766        Directly executes a command without adding it to the command queue.
767        """
768        if command != "run":
769            raise ParamError("unsupported command action: %s" % command,
770                             error_no="00100")
771        exec_type = options.exectype
772        if exec_type in [TestExecType.device_test, TestExecType.host_test,
773                         TestExecType.host_driven_test]:
774            self._exec_task(options)
775        else:
776            LOG.error("Unsupported execution type '%s'" % exec_type,
777                      error_no="00100")
778
779        return
780
781    def _exec_task(self, options):
782        """
783        Directly allocates a device and execute a device test.
784        """
785        try:
786            self.check_auto_retry(options)
787            task = self.__discover__(options.__dict__)
788            self.__execute__(task)
789        except (ParamError, ValueError, TypeError, SyntaxError,
790                AttributeError) as exception:
791            error_no = getattr(exception, "error_no", "00000")
792            LOG.exception("%s: %s" % (get_instance_name(exception), exception),
793                          exc_info=False, error_no=error_no)
794            if Scheduler.upload_address:
795                Scheduler.upload_unavailable_result(str(exception.args))
796                Scheduler.upload_report_end()
797        finally:
798            self.stop_task_logcat()
799            self.stop_encrypt_log()
800            self.start_auto_retry()
801
802    @classmethod
803    def _reset_environment(cls, environment="", config_file=""):
804        env_manager = EnvironmentManager()
805        env_manager.env_stop()
806        EnvironmentManager(environment, config_file)
807
808    @classmethod
809    def _restore_environment(cls):
810        env_manager = EnvironmentManager()
811        env_manager.env_stop()
812        EnvironmentManager()
813
814    @classmethod
815    def start_task_log(cls, log_path):
816        tool_file_name = "task_log.log"
817        tool_log_file = os.path.join(log_path, tool_file_name)
818        add_task_file_handler(tool_log_file)
819
820    @classmethod
821    def start_encrypt_log(cls, log_path):
822        from _core.report.encrypt import check_pub_key_exist
823        if check_pub_key_exist():
824            encrypt_file_name = "task_log.ept"
825            encrypt_log_file = os.path.join(log_path, encrypt_file_name)
826            add_encrypt_file_handler(encrypt_log_file)
827
828    @classmethod
829    def stop_task_logcat(cls):
830        remove_task_file_handler()
831
832    @classmethod
833    def stop_encrypt_log(cls):
834        remove_encrypt_file_handler()
835
836    @staticmethod
837    def _find_test_root_descriptor(config):
838        if getattr(config, ConfigConst.task, None) or \
839                getattr(config, ConfigConst.testargs, None):
840            Scheduler._pre_component_test(config)
841
842        if getattr(config, ConfigConst.subsystems, "") or \
843                getattr(config, ConfigConst.parts, "") or \
844                getattr(config, ConfigConst.component_base_kit, ""):
845            uid = unique_id("Scheduler", "component")
846            if config.subsystems or config.parts:
847                test_set = (config.subsystems, config.parts)
848            else:
849                kit = getattr(config, ConfigConst.component_base_kit)
850                test_set = kit.get_white_list()
851
852            root = Descriptor(uuid=uid, name="component",
853                              source=TestSetSource(test_set),
854                              container=True)
855
856            root.children = find_test_descriptors(config)
857            return root
858            # read test list from testdict
859        if getattr(config, ConfigConst.testdict, "") != "" and getattr(
860                config, ConfigConst.testfile, "") == "":
861            uid = unique_id("Scheduler", "testdict")
862            root = Descriptor(uuid=uid, name="testdict",
863                              source=TestSetSource(config.testdict),
864                              container=True)
865            root.children = find_testdict_descriptors(config)
866            return root
867
868            # read test list from testfile, testlist or task
869        test_set = getattr(config, ConfigConst.testfile, "") or getattr(
870            config, ConfigConst.testlist, "") or getattr(
871            config, ConfigConst.task, "") or getattr(
872            config, ConfigConst.testcase)
873        # read test list from testfile, testlist or task
874        test_set = getattr(config, "testfile", "") or getattr(
875            config, "testlist", "") or getattr(config, "task", "") or getattr(
876            config, "testcase")
877        if test_set:
878            fname, _ = get_filename_extension(test_set)
879            uid = unique_id("Scheduler", fname)
880            root = Descriptor(uuid=uid, name=fname,
881                              source=TestSetSource(test_set), container=True)
882            root.children = find_test_descriptors(config)
883            return root
884        else:
885            raise ParamError("no test file, list, dict, case or task found",
886                             error_no="00102")
887
888    @classmethod
889    def terminate_cmd_exec(cls):
890        Scheduler.is_execute = False
891        Scheduler.auto_retry = -1
892        LOG.info("Start to terminate execution")
893        return Scheduler.terminate_result.get()
894
895    @classmethod
896    def upload_case_result(cls, upload_param):
897        if not Scheduler.upload_address:
898            return
899        case_id, result, error, start_time, end_time, report_path = \
900            upload_param
901        if error and len(error) > MAX_VISIBLE_LENGTH:
902            error = "%s..." % error[:MAX_VISIBLE_LENGTH]
903        LOG.info(
904            "Get upload params: %s, %s, %s, %s, %s, %s" % (
905                case_id, result, error, start_time, end_time, report_path))
906        if Scheduler.proxy is not None:
907            Scheduler.proxy.upload_result(case_id, result, error, start_time,
908                                          end_time, report_path)
909        else:
910            LOG.debug("There is no proxy, can't upload case result")
911
912    @classmethod
913    def upload_module_result(cls, exec_message):
914        if not Scheduler.is_execute:
915            return
916        result_file = exec_message.get_result()
917        request = exec_message.get_request()
918
919        test_name = request.root.source.test_name
920        if not result_file or not os.path.exists(result_file):
921            LOG.error("%s result not exists", test_name, error_no="00200")
922            return
923
924        test_type = request.root.source.test_type
925        LOG.info("Need upload result: %s, test type: %s" %
926                 (result_file, test_type))
927        upload_params, _, _ = cls._get_upload_params(result_file, request)
928        if not upload_params:
929            LOG.error("%s no test case result to upload" % result_file,
930                      error_no="00201")
931            return
932        LOG.info("Need upload %s case" % len(upload_params))
933        upload_suite = []
934        for upload_param in upload_params:
935            case_id, result, error, start_time, end_time, report_path = \
936                upload_param
937            case = {"caseid": case_id, "result": result, "error": error,
938                    "start": start_time, "end": end_time,
939                    "report": report_path}
940            LOG.info("Case info: %s", case)
941            upload_suite.append(case)
942        if Scheduler.proxy is not None:
943            Scheduler.proxy.upload_batch(upload_suite)
944        else:
945            LOG.debug("There is no proxy, can't upload module result")
946
947    @classmethod
948    def _get_upload_params(cls, result_file, request):
949        upload_params = []
950        report_path = result_file
951        testsuites_element = DataHelper.parse_data_report(report_path)
952        start_time, end_time = cls._get_time(testsuites_element)
953        if request.get_test_type() == HostDrivenTestType.device_test:
954            for model_element in testsuites_element:
955                case_id = model_element.get(ReportConstant.name, "")
956                case_result, error = cls.get_script_result(model_element)
957                if error and len(error) > MAX_VISIBLE_LENGTH:
958                    error = "$s..." % error[:MAX_VISIBLE_LENGTH]
959                upload_params.append(
960                    (case_id, case_result, error, start_time,
961                     end_time, request.config.report_path,))
962        else:
963            for testsuite_element in testsuites_element:
964                if check_mode(ModeType.developer):
965                    module_name = str(get_filename_extension(
966                        report_path)[0]).split(".")[0]
967                else:
968                    module_name = testsuite_element.get(ReportConstant.name,
969                                                        "none")
970                for case_element in testsuite_element:
971                    case_id = cls._get_case_id(case_element, module_name)
972                    case_result, error = cls._get_case_result(case_element)
973                    if error and len(error) > MAX_VISIBLE_LENGTH:
974                        error = "%s..." % error[:MAX_VISIBLE_LENGTH]
975                    if case_result == "Ignored":
976                        LOG.info("Get upload params: %s result is ignored",
977                                 case_id)
978                        continue
979                    upload_params.append(
980                        (case_id, case_result, error, start_time,
981                         end_time, request.config.report_path,))
982        return upload_params, start_time, end_time
983
984    @classmethod
985    def get_script_result(cls, model_element):
986        disabled = int(model_element.get(ReportConstant.disabled)) if \
987            model_element.get(ReportConstant.disabled, "") else 0
988        failures = int(model_element.get(ReportConstant.failures)) if \
989            model_element.get(ReportConstant.failures, "") else 0
990        errors = int(model_element.get(ReportConstant.errors)) if \
991            model_element.get(ReportConstant.errors, "") else 0
992        unavailable = int(model_element.get(ReportConstant.unavailable)) if \
993            model_element.get(ReportConstant.unavailable, "") else 0
994        if failures > 0 or errors > 0:
995            result = "Failed"
996        elif disabled > 0 or unavailable > 0:
997            result = "Unavailable"
998        else:
999            result = "Passed"
1000
1001        if result == "Passed":
1002            return result, ""
1003        if Scheduler.mode == ModeType.decc:
1004            result = "Failed"
1005
1006        error_msg = model_element.get(ReportConstant.message, "")
1007        if not error_msg and len(model_element) > 0:
1008            error_msg = model_element[0].get(ReportConstant.message, "")
1009            if not error_msg and len(model_element[0]) > 0:
1010                error_msg = model_element[0][0].get(ReportConstant.message, "")
1011        return result, error_msg
1012
1013    @classmethod
1014    def _get_case_id(cls, case_element, package_name):
1015        class_name = case_element.get(ReportConstant.class_name, "none")
1016        method_name = case_element.get(ReportConstant.name, "none")
1017        case_id = "{}#{}#{}#{}".format(Scheduler.task_name, package_name,
1018                                       class_name, method_name)
1019        return case_id
1020
1021    @classmethod
1022    def _get_case_result(cls, case_element):
1023        # get result
1024        case = Case()
1025        case.status = case_element.get(ReportConstant.status, "")
1026        case.result = case_element.get(ReportConstant.result, "")
1027        if case_element.get(ReportConstant.message, ""):
1028            case.message = case_element.get(ReportConstant.message)
1029        if len(case_element) > 0:
1030            if not case.result:
1031                case.result = ReportConstant.false
1032            case.message = case_element[0].get(ReportConstant.message)
1033        if case.is_passed():
1034            result = "Passed"
1035        elif case.is_failed():
1036            result = "Failed"
1037        elif case.is_blocked():
1038            result = "Blocked"
1039        elif case.is_ignored():
1040            result = "Ignored"
1041        else:
1042            result = "Unavailable"
1043        return result, case.message
1044
1045    @classmethod
1046    def _get_time(cls, testsuite_element):
1047        start_time = testsuite_element.get(ReportConstant.start_time, "")
1048        end_time = testsuite_element.get(ReportConstant.end_time, "")
1049        try:
1050            if start_time and end_time:
1051                start_time = int(time.mktime(time.strptime(
1052                    start_time, ReportConstant.time_format)) * 1000)
1053                end_time = int(time.mktime(time.strptime(
1054                    end_time, ReportConstant.time_format)) * 1000)
1055            else:
1056                timestamp = str(testsuite_element.get(
1057                    ReportConstant.time_stamp, "")).replace("T", " ")
1058                cost_time = testsuite_element.get(ReportConstant.time, "")
1059                if timestamp and cost_time:
1060                    try:
1061                        end_time = int(time.mktime(time.strptime(
1062                            timestamp, ReportConstant.time_format)) * 1000)
1063                    except ArithmeticError as error:
1064                        LOG.error("Get time error %s" % error)
1065                        end_time = int(time.time() * 1000)
1066                    start_time = int(end_time - float(cost_time) * 1000)
1067                else:
1068                    current_time = int(time.time() * 1000)
1069                    start_time, end_time = current_time, current_time
1070        except ArithmeticError as error:
1071            LOG.error("Get time error %s" % error)
1072            current_time = int(time.time() * 1000)
1073            start_time, end_time = current_time, current_time
1074        return start_time, end_time
1075
1076    @classmethod
1077    def upload_task_result(cls, task, error_message=""):
1078        if not Scheduler.task_name:
1079            LOG.info("No need upload summary report")
1080            return
1081
1082        summary_data_report = os.path.join(task.config.report_path,
1083                                           ReportConstant.summary_data_report)
1084        if not os.path.exists(summary_data_report):
1085            Scheduler.upload_unavailable_result(str(
1086                error_message) or "summary report not exists",
1087                                                task.config.report_path)
1088            return
1089
1090        task_element = ElementTree.parse(summary_data_report).getroot()
1091        start_time, end_time = cls._get_time(task_element)
1092        task_result = cls._get_task_result(task_element)
1093        error_msg = ""
1094        for child in task_element:
1095            if child.get(ReportConstant.message, ""):
1096                error_msg = "{}{}".format(
1097                    error_msg, "%s;" % child.get(ReportConstant.message))
1098        if error_msg:
1099            error_msg = error_msg[:-1]
1100        cls.upload_case_result((Scheduler.task_name, task_result,
1101                                error_msg, start_time, end_time,
1102                                task.config.report_path))
1103
1104    @classmethod
1105    def _get_task_result(cls, task_element):
1106        failures = int(task_element.get(ReportConstant.failures, 0))
1107        errors = int(task_element.get(ReportConstant.errors, 0))
1108        disabled = int(task_element.get(ReportConstant.disabled, 0))
1109        unavailable = int(task_element.get(ReportConstant.unavailable, 0))
1110        if disabled > 0:
1111            task_result = "Blocked"
1112        elif errors > 0 or failures > 0:
1113            task_result = "Failed"
1114        elif unavailable > 0:
1115            task_result = "Unavailable"
1116        else:
1117            task_result = "Passed"
1118        return task_result
1119
1120    @classmethod
1121    def upload_unavailable_result(cls, error_msg, report_path=""):
1122        start_time = int(time.time() * 1000)
1123        Scheduler.upload_case_result((Scheduler.task_name, "Unavailable",
1124                                      error_msg, start_time, start_time,
1125                                      report_path))
1126
1127    @classmethod
1128    def upload_report_end(cls):
1129        if getattr(cls, "tmp_json", None):
1130            os.remove(cls.tmp_json)
1131            del cls.tmp_json
1132        LOG.info("Upload report end")
1133        if Scheduler.proxy is not None:
1134            Scheduler.proxy.report_end()
1135        else:
1136            LOG.debug("There is no proxy, can't upload report end")
1137
1138    @classmethod
1139    def is_module_need_retry(cls, task, module_name):
1140        failed_flag = False
1141        if check_mode(ModeType.decc):
1142            from xdevice import SuiteReporter
1143            for module, failed in SuiteReporter.get_failed_case_list():
1144                if module_name == module or str(module_name).split(
1145                        ".")[0] == module:
1146                    failed_flag = True
1147                    break
1148        else:
1149            from xdevice import ResultReporter
1150            history_report_path = \
1151                getattr(task.config, ConfigConst.history_report_path, "")
1152            params = ResultReporter.get_task_info_params(history_report_path)
1153            if params and params[ReportConst.unsuccessful_params]:
1154                if dict(params[ReportConst.unsuccessful_params]).get(module_name, []):
1155                    failed_flag = True
1156                elif dict(params[ReportConst.unsuccessful_params]).get(str(module_name).split(".")[0], []):
1157                    failed_flag = True
1158        return failed_flag
1159
1160    @classmethod
1161    def compare_spt_time(cls, kit_spt, device_spt):
1162        if not kit_spt or not device_spt:
1163            return False
1164        try:
1165            kit_time = str(kit_spt).split("-")[:2]
1166            device_time = str(device_spt).split("-")[:2]
1167            k_spt = datetime.datetime.strptime(
1168                "-".join(kit_time), "%Y-%m")
1169            d_spt = datetime.datetime.strptime("-".join(device_time), "%Y-%m")
1170        except ValueError as value_error:
1171            LOG.debug("Date format is error, %s" % value_error.args)
1172            return False
1173        month_interval = int(k_spt.month) - int(d_spt.month)
1174        year_interval = int(k_spt.year) - int(d_spt.year)
1175        LOG.debug("Kit spt (year=%s, month=%s), device spt (year=%s, month=%s)"
1176                  % (k_spt.year, k_spt.month, d_spt.year, d_spt.month))
1177        if year_interval < 0:
1178            return True
1179        if year_interval == 0 and month_interval in range(-11, 3):
1180            return True
1181        if year_interval == 1 and month_interval + 12 in (1, 2):
1182            return True
1183
1184    @classmethod
1185    def _parse_property_value(cls, property_name, driver_request, kit):
1186        test_args = copy.deepcopy(
1187            driver_request.config.get(ConfigConst.testargs, dict()))
1188        property_value = ""
1189        if ConfigConst.pass_through in test_args.keys():
1190            import json
1191            pt_dict = json.loads(test_args.get(ConfigConst.pass_through, ""))
1192            property_value = pt_dict.get(property_name, None)
1193        elif property_name in test_args.keys:
1194            property_value = test_args.get(property_name, None)
1195        return property_value if property_value else \
1196            kit.properties.get(property_name, None)
1197
1198    @classmethod
1199    def _calculate_device_options(cls, device_options, environment_config,
1200                                  options, test_source):
1201        # calculate difference
1202        diff_value = len(environment_config) - len(device_options)
1203        if device_options and diff_value == 0:
1204            return device_options
1205
1206        else:
1207            diff_value = diff_value if diff_value else 1
1208            if str(test_source.source_file).endswith(".bin"):
1209                device_option = DeviceSelectionOption(
1210                    options, DeviceLabelType.ipcamera, test_source)
1211            else:
1212                device_option = DeviceSelectionOption(
1213                    options, None, test_source)
1214
1215            device_option.source_file = \
1216                test_source.source_file or test_source.source_string
1217            device_option.required_manager = "device"
1218            device_options.extend([device_option] * diff_value)
1219            LOG.debug("Assign device options and it's length is %s"
1220                      % len(device_options))
1221        return device_options
1222
1223    @classmethod
1224    def update_test_type_in_source(cls, key, value):
1225        LOG.debug("update test type dict in source")
1226        TestDictSource.test_type[key] = value
1227
1228    @classmethod
1229    def update_ext_type_in_source(cls, key, value):
1230        LOG.debug("update ext type dict in source")
1231        TestDictSource.exe_type[key] = value
1232
1233    @classmethod
1234    def clear_test_dict_source(cls):
1235        TestDictSource.clear()
1236
1237    @classmethod
1238    def reset_test_dict_source(cls):
1239        TestDictSource.reset()
1240
1241    @classmethod
1242    def _pre_component_test(cls, config):
1243        if not config.kits:
1244            return
1245        cur_kit = None
1246        for kit in config.kits:
1247            if kit.__class__.__name__ == CKit.component:
1248                cur_kit = kit
1249                break
1250        if not cur_kit:
1251            return
1252        get_white_list = getattr(cur_kit, "get_white_list", None)
1253        if not callable(get_white_list):
1254            return
1255        subsystems, parts = get_white_list()
1256        if not subsystems and not parts:
1257            return
1258        setattr(config, ConfigConst.component_base_kit, cur_kit)
1259
1260    @classmethod
1261    def component_task_setup(cls, task, module_name):
1262        component_kit = task.config.get(ConfigConst.component_base_kit, None)
1263        if not component_kit:
1264            # only -p -s .you do not care about the components that can be
1265            # supported. you only want to run the use cases of the current
1266            # component
1267            return
1268        LOG.debug("Start component task setup")
1269        _component_mapper = task.config.get(ConfigConst.component_mapper)
1270        _subsystem, _part = _component_mapper.get(module_name)
1271
1272        is_hit = False
1273        # find in cache. if not find, update cache
1274        cache_subsystem, cache_part = component_kit.get_cache()
1275        if _subsystem in cache_subsystem or _part in cache_subsystem:
1276            is_hit = True
1277        if not is_hit:
1278            env_manager = EnvironmentManager()
1279            for _, manager in env_manager.managers.items():
1280                if getattr(manager, "devices_list", []):
1281                    for device in manager.devices_list:
1282                        component_kit.__setup__(device)
1283            cache_subsystem, cache_part = component_kit.get_cache()
1284            if _subsystem in cache_subsystem or _part in cache_subsystem:
1285                is_hit = True
1286        if not is_hit:
1287            LOG.warning("%s are skipped, no suitable component found. "
1288                        "Require subsystem=%s part=%s, no device match this"
1289                        % (module_name, _subsystem, _part))
1290
1291    @classmethod
1292    def start_auto_retry(cls):
1293        if not Scheduler.is_need_auto_retry:
1294            Scheduler.auto_retry = -1
1295            LOG.debug("No need auto retry")
1296            return
1297        if Scheduler.auto_retry > 0:
1298            Scheduler.auto_retry -= 1
1299            if Scheduler.auto_retry == 0:
1300                Scheduler.auto_retry = -1
1301            from _core.command.console import Console
1302            console = Console()
1303            console.command_parser("run --retry")
1304
1305    @classmethod
1306    def check_auto_retry(cls, options):
1307        if Scheduler.auto_retry < 0 and int(getattr(options, ConfigConst.auto_retry, 0)) > 0:
1308            value = int(getattr(options, ConfigConst.auto_retry, 0))
1309            Scheduler.auto_retry = value if value <= 10 else 10
1310