• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import copy
20import datetime
21import os
22import queue
23import time
24import uuid
25from xml.etree import ElementTree
26
27from _core.utils import unique_id
28from _core.utils import check_mode
29from _core.utils import get_sub_path
30from _core.utils import get_filename_extension
31from _core.utils import convert_serial
32from _core.utils import get_instance_name
33from _core.utils import is_config_str
34from _core.utils import check_result_report
35from _core.environment.manager_env import EnvironmentManager
36from _core.environment.manager_env import DeviceSelectionOption
37from _core.exception import ParamError
38from _core.exception import ExecuteTerminate
39from _core.exception import LiteDeviceError
40from _core.exception import DeviceError
41from _core.interface import LifeCycle
42from _core.executor.request import Request
43from _core.executor.request import Task
44from _core.executor.request import Descriptor
45from _core.plugin import get_plugin
46from _core.plugin import Plugin
47from _core.plugin import Config
48from _core.report.reporter_helper import ExecInfo
49from _core.report.reporter_helper import ReportConstant
50from _core.report.reporter_helper import Case
51from _core.report.reporter_helper import DataHelper
52from _core.constants import TestExecType
53from _core.constants import CKit
54from _core.constants import ModeType
55from _core.constants import DeviceLabelType
56from _core.constants import SchedulerType
57from _core.constants import ListenerType
58from _core.constants import ConfigConst
59from _core.executor.concurrent import DriversThread
60from _core.executor.concurrent import QueueMonitorThread
61from _core.executor.source import TestSetSource
62from _core.executor.source import find_test_descriptors
63from _core.executor.source import find_testdict_descriptors
64from _core.executor.source import TestDictSource
65from _core.logger import platform_logger
66from _core.logger import add_task_file_handler
67from _core.logger import remove_task_file_handler
68from _core.logger import add_encrypt_file_handler
69from _core.logger import remove_encrypt_file_handler
70
71__all__ = ["Scheduler"]
72LOG = platform_logger("Scheduler")
73
74
75@Plugin(type=Plugin.SCHEDULER, id=SchedulerType.scheduler)
76class Scheduler(object):
77    """
78    The Scheduler is the main entry point for client code that wishes to
79    discover and execute tests.
80    """
81    # factory params
82    is_execute = True
83    terminate_result = queue.Queue()
84    upload_address = ""
85    task_type = ""
86    task_name = ""
87    mode = ""
88    proxy = None
89
90    # command_queue to store test commands
91    command_queue = []
92    max_command_num = 50
93    # the number of tests in current task
94    test_number = 0
95
96    def __discover__(self, args):
97        """discover task to execute"""
98        config = Config()
99        config.update(args)
100        task = Task(drivers=[])
101        task.init(config)
102
103        TestDictSource.reset()
104        root_descriptor = self._find_test_root_descriptor(task.config)
105        task.set_root_descriptor(root_descriptor)
106        return task
107
108    def __execute__(self, task):
109        error_message = ""
110        try:
111            Scheduler.is_execute = True
112            if Scheduler.command_queue:
113                LOG.debug("Run command: %s" % Scheduler.command_queue[-1])
114                run_command = Scheduler.command_queue.pop()
115                task_id = str(uuid.uuid1()).split("-")[0]
116                Scheduler.command_queue.append((task_id, run_command,
117                                                task.config.report_path))
118                if len(Scheduler.command_queue) > self.max_command_num:
119                    Scheduler.command_queue.pop(0)
120
121            if getattr(task.config, ConfigConst.test_environment, ""):
122                self._reset_environment(task.config.get(
123                    ConfigConst.test_environment, ""))
124            elif getattr(task.config, ConfigConst.configfile, ""):
125                self._reset_environment(config_file=task.config.get(
126                    ConfigConst.configfile, ""))
127
128            # do with the count of repeat about a task
129            if getattr(task.config, ConfigConst.repeat, 0) > 0:
130                drivers_list = list()
131                for repeat_index in range(task.config.repeat):
132                    for driver_index in range(len(task.test_drivers)):
133                        drivers_list.append(
134                            copy.deepcopy(task.test_drivers[driver_index]))
135                task.test_drivers = drivers_list
136
137            self.test_number = len(task.test_drivers)
138
139            if task.config.exectype == TestExecType.device_test:
140                self._device_test_execute(task)
141            elif task.config.exectype == TestExecType.host_test:
142                self._host_test_execute(task)
143            else:
144                LOG.info("exec type %s is bypassed" % task.config.exectype)
145
146        except (ParamError, ValueError, TypeError, SyntaxError, AttributeError,
147                DeviceError, LiteDeviceError, ExecuteTerminate) as exception:
148            error_no = getattr(exception, "error_no", "")
149            error_message = "%s[%s]" % (str(exception), error_no) \
150                if error_no else str(exception)
151            error_no = error_no if error_no else "00000"
152            LOG.exception(exception, exc_info=True, error_no=error_no)
153
154        finally:
155            Scheduler._clear_test_dict_source()
156            if getattr(task.config, ConfigConst.test_environment, "") or \
157                    getattr(task.config, ConfigConst.configfile, ""):
158                self._restore_environment()
159
160            if Scheduler.upload_address:
161                Scheduler.upload_task_result(task, error_message)
162                Scheduler.upload_report_end()
163
164    def _device_test_execute(self, task):
165        used_devices = {}
166        try:
167            self._dynamic_concurrent_execute(task, used_devices)
168        finally:
169            # generate reports
170            self._generate_task_report(task, used_devices)
171
172    def _host_test_execute(self, task):
173        """execute host test"""
174        try:
175            # initial params
176            current_driver_threads = {}
177            test_drivers = task.test_drivers
178            message_queue = queue.Queue()
179
180            # execute test drivers
181            queue_monitor_thread = self._start_queue_monitor(
182                message_queue, test_drivers, current_driver_threads)
183            while test_drivers:
184                if len(current_driver_threads) > 5:
185                    time.sleep(3)
186                    continue
187
188                # clear remaining test drivers when scheduler is terminated
189                if not Scheduler.is_execute:
190                    LOG.info("clear test drivers")
191                    self._clear_not_executed(task, test_drivers)
192                    break
193
194                # get test driver and device
195                test_driver = test_drivers[0]
196
197                # display executing progress
198                self._display_executing_process(None, test_driver,
199                                                test_drivers)
200
201                # start driver thread
202                self._start_driver_thread(current_driver_threads, (
203                    None, message_queue, task, test_driver))
204                test_drivers.pop(0)
205
206            # wait for all drivers threads finished and do kit teardown
207            while True:
208                if not queue_monitor_thread.is_alive():
209                    break
210                time.sleep(3)
211
212        finally:
213            # generate reports
214            self._generate_task_report(task)
215
216    def _generate_task_report(self, task, used_devices=None):
217        task_info = ExecInfo()
218        test_type = getattr(task.config, "testtype", [])
219        task_name = getattr(task.config, "task", "")
220        if task_name:
221            task_info.test_type = str(task_name).upper()
222        else:
223            task_info.test_type = ",".join(test_type) if test_type else "Test"
224        if used_devices:
225            serials = []
226            platforms = []
227            for serial, device in used_devices.items():
228                serials.append(convert_serial(serial))
229                platform = str(device.label).capitalize()
230                if platform not in platforms:
231                    platforms.append(platform)
232            task_info.device_name = ",".join(serials)
233            task_info.platform = ",".join(platforms)
234        else:
235            task_info.device_name = "None"
236            task_info.platform = "None"
237        task_info.test_time = task.config.start_time
238        task_info.product_info = getattr(task, "product_info", "")
239
240        listeners = self._create_listeners(task)
241        for listener in listeners:
242            listener.__ended__(LifeCycle.TestTask, task_info,
243                               test_type=task_info.test_type)
244
245    @classmethod
246    def _create_listeners(cls, task):
247        listeners = []
248        # append log listeners
249        log_listeners = get_plugin(Plugin.LISTENER, ListenerType.log)
250        for log_listener in log_listeners:
251            log_listener_instance = log_listener.__class__()
252            listeners.append(log_listener_instance)
253        # append report listeners
254        report_listeners = get_plugin(Plugin.LISTENER, ListenerType.report)
255        for report_listener in report_listeners:
256            report_listener_instance = report_listener.__class__()
257            setattr(report_listener_instance, "report_path",
258                    task.config.report_path)
259            listeners.append(report_listener_instance)
260        # append upload listeners
261        upload_listeners = get_plugin(Plugin.LISTENER, ListenerType.upload)
262        for upload_listener in upload_listeners:
263            upload_listener_instance = upload_listener.__class__()
264            listeners.append(upload_listener_instance)
265        return listeners
266
267    @staticmethod
268    def _find_device_options(environment_config, options, test_source):
269        devices_option = []
270        for device_dict in environment_config:
271            label = device_dict.get("label", "")
272            if not label:
273                continue
274            device_option = DeviceSelectionOption(options, label, test_source)
275            device_dict.pop("type", None)
276            device_dict.pop("label", None)
277            device_option.extend_value = device_dict
278            device_option.source_file = test_source.config_file or \
279                                        test_source.source_string
280            devices_option.append(device_option)
281        return devices_option
282
283    def __allocate_environment__(self, options, test_driver):
284        device_options = self.get_device_options(options,
285                                                 test_driver[1].source)
286        environment = None
287        env_manager = EnvironmentManager()
288        while True:
289            if not Scheduler.is_execute:
290                break
291            environment = env_manager.apply_environment(device_options)
292            if len(environment.devices) == len(device_options):
293                return environment
294            else:
295                env_manager.release_environment(environment)
296                LOG.debug("'%s' is waiting available device",
297                          test_driver[1].source.test_name)
298                if env_manager.check_device_exist(device_options):
299                    continue
300                else:
301                    raise DeviceError("The '%s' required device does not exist"
302                                      % test_driver[1].source.source_file,
303                                      error_no="00104")
304
305        return environment
306
307    @classmethod
308    def get_device_options(cls, options, test_source):
309        device_options = []
310        config_file = test_source.config_file
311
312        from _core.testkit.json_parser import JsonParser
313        if test_source.source_string and is_config_str(
314                test_source.source_string):
315            json_config = JsonParser(test_source.source_string)
316            device_options = cls._find_device_options(
317                json_config.get_environment(), options, test_source)
318        elif config_file and os.path.exists(config_file):
319            json_config = JsonParser(test_source.config_file)
320            device_options = cls._find_device_options(
321                json_config.get_environment(), options, test_source)
322
323        if not device_options:
324            if str(test_source.source_file).endswith(".bin"):
325                device_option = DeviceSelectionOption(
326                    options, DeviceLabelType.ipcamera, test_source)
327            else:
328                device_option = DeviceSelectionOption(
329                    options, None, test_source)
330            device_option.source_file = test_source.source_file or \
331                                        test_source.source_string
332            device_options.append(device_option)
333
334        if ConfigConst.component_mapper in options.keys():
335            required_component = options.get(ConfigConst.component_mapper). \
336                get(test_source.module_name, None)
337            for device_option in device_options:
338                device_option.required_component = required_component
339
340        return device_options
341
342    @staticmethod
343    def __free_environment__(environment):
344        env_manager = EnvironmentManager()
345        env_manager.release_environment(environment)
346
347    def _check_device_spt(self, kit, driver_request, device):
348        kit_version = self._parse_version_id(driver_request, kit)
349        kit_spt = kit.properties.get(ConfigConst.spt, None)
350        if not kit_spt or not kit_version:
351            setattr(device, ConfigConst.task_state, False)
352            LOG.error("spt or version is empty", error_no="00108")
353            return
354        if getattr(driver_request, ConfigConst.product_info, ""):
355            product_info = getattr(driver_request,
356                                   ConfigConst.product_info)
357            if not isinstance(product_info, dict):
358                LOG.warning("product info should be dict, %s",
359                            product_info)
360                setattr(device, ConfigConst.task_state, False)
361                return
362            device_spt = product_info.get("Security Patch", None)
363            device_version = product_info.get("VersionID", None)
364            if not device_version or not device_version == kit_version:
365                LOG.error("The device %s VersionID is %s, "
366                          "and the test case version is %s, "
367                          "which does not meet the requirements" %
368                          (device.device_sn, device_version, kit_version),
369                          error_no="00116")
370                setattr(device, ConfigConst.task_state, False)
371                return
372
373            if not device_spt or not \
374                    Scheduler.compare_spt_time(kit_spt, device_spt):
375                LOG.error("The device %s spt is %s, "
376                          "and the test case spt is %s, "
377                          "which does not meet the requirements" %
378                          (device.device_sn, device_spt, kit_spt),
379                          error_no="00116")
380
381                setattr(device, ConfigConst.task_state, False)
382                return
383
384    def _decc_task_setup(self, environment, task):
385        config = Config()
386        config.update(task.config.__dict__)
387        config.environment = environment
388        driver_request = Request(config=config)
389
390        if environment is None:
391            return False
392
393        for device in environment.devices:
394            if not getattr(device, ConfigConst.need_kit_setup, True):
395                LOG.debug("device %s need kit setup is false" % device)
396                continue
397
398            # do task setup for device
399            kits_copy = copy.deepcopy(task.config.kits)
400            setattr(device, ConfigConst.task_kits, kits_copy)
401            for kit in getattr(device, ConfigConst.task_kits, []):
402                if not Scheduler.is_execute:
403                    break
404                try:
405                    kit.__setup__(device, request=driver_request)
406                except (ParamError, ExecuteTerminate, DeviceError,
407                        LiteDeviceError, ValueError, TypeError,
408                        SyntaxError, AttributeError) as exception:
409                    error_no = getattr(exception, "error_no", "00000")
410                    LOG.exception(
411                        "task setup device: %s, exception: %s" % (
412                            environment.__get_serial__(),
413                            exception), exc_info=False, error_no=error_no)
414                if kit.__class__.__name__ == CKit.query:
415                    self._check_device_spt(kit, driver_request, device)
416            LOG.debug("set device %s need kit setup to false" % device)
417            setattr(device, ConfigConst.need_kit_setup, False)
418
419        for device in environment.devices:
420            if not getattr(device, ConfigConst.task_state, True):
421                return False
422
423        # set product_info to self.task
424        if getattr(driver_request, ConfigConst.product_info, "") and \
425                not getattr(task, ConfigConst.product_info, ""):
426            product_info = getattr(driver_request, ConfigConst.product_info)
427            if not isinstance(product_info, dict):
428                LOG.warning("product info should be dict, %s",
429                            product_info)
430            else:
431                setattr(task, ConfigConst.product_info, product_info)
432        return True
433
434    def _dynamic_concurrent_execute(self, task, used_devices):
435        # initial params
436        current_driver_threads = {}
437        test_drivers = task.test_drivers
438        message_queue = queue.Queue()
439        task_unused_env = []
440
441        # execute test drivers
442        queue_monitor_thread = self._start_queue_monitor(
443            message_queue, test_drivers, current_driver_threads)
444        while test_drivers:
445            # clear remaining test drivers when scheduler is terminated
446            if not Scheduler.is_execute:
447                LOG.info("clear test drivers")
448                self._clear_not_executed(task, test_drivers)
449                break
450
451            # get test driver and device
452            test_driver = test_drivers[0]
453
454            if getattr(task.config, ConfigConst.history_report_path, ""):
455                module_name = test_driver[1].source.module_name
456                if not self.is_module_need_retry(task, module_name):
457                    self._display_executing_process(None, test_driver,
458                                                    test_drivers)
459                    LOG.info("%s are passed, no need to retry" % module_name)
460                    self._append_history_result(task, module_name)
461                    LOG.info("")
462                    test_drivers.pop(0)
463                    continue
464
465            if getattr(task.config, ConfigConst.component_mapper, ""):
466                module_name = test_driver[1].source.module_name
467                self.component_task_setup(task, module_name)
468
469            # get environment
470            try:
471                environment = self.__allocate_environment__(
472                    task.config.__dict__, test_driver)
473            except DeviceError as exception:
474                self._handle_device_error(exception, task, test_drivers)
475                continue
476
477            if not Scheduler.is_execute:
478                if environment:
479                    Scheduler.__free_environment__(environment)
480                continue
481
482            if check_mode(ModeType.decc) or getattr(
483                    task.config, ConfigConst.check_device, False):
484                LOG.info("start to check environment: %s" %
485                         environment.__get_serial__())
486                status = self._decc_task_setup(environment, task)
487                if not status:
488                    Scheduler.__free_environment__(environment)
489                    task_unused_env.append(environment)
490                    error_message = "Load Error[00116]"
491                    self.report_not_executed(task.config.report_path,
492                                             [test_drivers[0]],
493                                             error_message, task)
494                    test_drivers.pop(0)
495                    continue
496                else:
497                    LOG.info("environment %s check success",
498                             environment.__get_serial__())
499
500            # display executing progress
501            self._display_executing_process(environment, test_driver,
502                                            test_drivers)
503
504            # add to used devices and set need_kit_setup attribute
505            self._append_used_devices(environment, used_devices)
506
507            # start driver thread
508            self._start_driver_thread(current_driver_threads, (
509                environment, message_queue, task, test_driver))
510            test_drivers.pop(0)
511
512        # wait for all drivers threads finished and do kit teardown
513        while True:
514            if not queue_monitor_thread.is_alive():
515                break
516            time.sleep(3)
517
518        self._do_taskkit_teardown(used_devices, task_unused_env)
519
520    @classmethod
521    def _append_history_result(cls, task, module_name):
522        history_report_path = getattr(
523            task.config, ConfigConst.history_report_path, "")
524        from _core.report.result_reporter import ResultReporter
525        params = ResultReporter.get_task_info_params(
526            history_report_path)
527
528        if not params or not params[4]:
529            LOG.debug("task info record data reports is empty")
530            return
531
532        report_data_dict = dict(params[4])
533        if module_name not in report_data_dict.keys():
534            module_name_ = str(module_name).split(".")[0]
535            if module_name_ not in report_data_dict.keys():
536                LOG.error("%s not in data reports" % module_name)
537                return
538            module_name = module_name_
539
540        from xdevice import SuiteReporter
541        if check_mode(ModeType.decc):
542            virtual_report_path, report_result = SuiteReporter. \
543                get_history_result_by_module(module_name)
544            LOG.debug("append history result: (%s, %s)" % (
545                virtual_report_path, report_result))
546            SuiteReporter.append_report_result(
547                (virtual_report_path, report_result))
548        else:
549            import shutil
550            history_execute_result = report_data_dict.get(module_name, "")
551            LOG.info("start copy %s" % history_execute_result)
552            file_name = get_filename_extension(history_execute_result)[0]
553            if os.path.exists(history_execute_result):
554                result_dir = \
555                    os.path.join(task.config.report_path, "result")
556                os.makedirs(result_dir, exist_ok=True)
557                target_execute_result = "%s.xml" % os.path.join(
558                    task.config.report_path, "result", file_name)
559                shutil.copyfile(history_execute_result, target_execute_result)
560                LOG.info("copy %s to %s" % (
561                    history_execute_result, target_execute_result))
562            else:
563                error_msg = "copy failed! %s not exists!" % \
564                            history_execute_result
565                raise ParamError(error_msg)
566
567    def _handle_device_error(self, exception, task, test_drivers):
568        self._display_executing_process(None, test_drivers[0], test_drivers)
569        error_message = "%s: %s" % \
570                        (get_instance_name(exception), exception)
571        LOG.exception(error_message, exc_info=False,
572                      error_no=exception.error_no)
573        if check_mode(ModeType.decc):
574            error_message = "Load Error[00104]"
575        self.report_not_executed(task.config.report_path, [test_drivers[0]],
576                                 error_message, task)
577
578        LOG.info("")
579        test_drivers.pop(0)
580
581    @classmethod
582    def _clear_not_executed(cls, task, test_drivers):
583        if Scheduler.mode != ModeType.decc:
584            # clear all
585            test_drivers.clear()
586            return
587        # The result is reported only in DECC mode, and also clear all.
588        LOG.error("case no run: task execution terminated!", error_no="00300")
589        error_message = "Execute Terminate[00300]"
590        cls.report_not_executed(task.config.report_path, test_drivers,
591                                error_message)
592        test_drivers.clear()
593
594    @classmethod
595    def report_not_executed(cls, report_path, test_drivers, error_message,
596                            task=None):
597        # traversing list to get remained elements
598        for test_driver in test_drivers:
599            # get report file
600            if task and getattr(task.config, "testdict", ""):
601                report_file = os.path.join(get_sub_path(
602                    test_driver[1].source.source_file),
603                    "%s.xml" % test_driver[1].source.test_name)
604            else:
605                report_file = os.path.join(
606                    report_path, "result",
607                    "%s.xml" % test_driver[1].source.module_name)
608
609            # get report name
610            report_name = test_driver[1].source.test_name if \
611                not test_driver[1].source.test_name.startswith("{") \
612                else "report"
613
614            # get module name
615            module_name = test_driver[1].source.module_name
616
617            # here, normally create empty report and then upload result
618            check_result_report(report_path, report_file, error_message,
619                                report_name, module_name)
620
621    def _start_driver_thread(self, current_driver_threads, thread_params):
622        environment, message_queue, task, test_driver = thread_params
623        thread_id = self._get_thread_id(current_driver_threads)
624        driver_thread = DriversThread(test_driver, task, environment,
625                                      message_queue)
626        driver_thread.setDaemon(True)
627        driver_thread.set_thread_id(thread_id)
628        driver_thread.set_listeners(self._create_listeners(task))
629        driver_thread.start()
630        current_driver_threads.setdefault(thread_id, driver_thread)
631
632    @classmethod
633    def _do_taskkit_teardown(cls, used_devices, task_unused_env):
634        for device in used_devices.values():
635            if getattr(device, ConfigConst.need_kit_setup, True):
636                continue
637
638            for kit in getattr(device, ConfigConst.task_kits, []):
639                try:
640                    kit.__teardown__(device)
641                except Exception as error:
642                    LOG.debug("do taskkit teardown: %s" % error)
643            setattr(device, ConfigConst.task_kits, [])
644            setattr(device, ConfigConst.need_kit_setup, True)
645
646        for environment in task_unused_env:
647            for device in environment.devices:
648                setattr(device, ConfigConst.task_state, True)
649                setattr(device, ConfigConst.need_kit_setup, True)
650
651    def _display_executing_process(self, environment, test_driver,
652                                   test_drivers):
653        source_content = test_driver[1].source.source_file or \
654                         test_driver[1].source.source_string
655        if environment is None:
656            LOG.info("[%d / %d] Executing: %s, Driver: %s" %
657                     (self.test_number - len(test_drivers) + 1,
658                      self.test_number, source_content,
659                      test_driver[1].source.test_type))
660            return
661
662        LOG.info("[%d / %d] Executing: %s, Device: %s, Driver: %s" %
663                 (self.test_number - len(test_drivers) + 1,
664                  self.test_number, source_content,
665                  environment.__get_serial__(),
666                  test_driver[1].source.test_type))
667
668    @classmethod
669    def _get_thread_id(cls, current_driver_threads):
670        thread_id = datetime.datetime.now().strftime(
671            '%Y-%m-%d-%H-%M-%S-%f')
672        while thread_id in current_driver_threads.keys():
673            thread_id = datetime.datetime.now().strftime(
674                '%Y-%m-%d-%H-%M-%S-%f')
675        return thread_id
676
677    @classmethod
678    def _append_used_devices(cls, environment, used_devices):
679        for device in environment.devices:
680            device_serial = device.__get_serial__() if device else "None"
681            if device_serial and device_serial not in used_devices.keys():
682                used_devices[device_serial] = device
683
684    @staticmethod
685    def _start_queue_monitor(message_queue, test_drivers,
686                             current_driver_threads):
687        queue_monitor_thread = QueueMonitorThread(message_queue,
688                                                  current_driver_threads,
689                                                  test_drivers)
690        queue_monitor_thread.setDaemon(True)
691        queue_monitor_thread.start()
692        return queue_monitor_thread
693
694    def exec_command(self, command, options):
695        """
696        Directly executes a command without adding it to the command queue.
697        """
698        if command != "run":
699            raise ParamError("unsupported command action: %s" % command,
700                             error_no="00100")
701        exec_type = options.exectype
702        if exec_type in [TestExecType.device_test, TestExecType.host_test,
703                         TestExecType.host_driven_test]:
704            self._exec_task(options)
705        else:
706            LOG.error("unsupported execution type '%s'" % exec_type,
707                      error_no="00100")
708
709        return
710
711    def _exec_task(self, options):
712        """
713        Directly allocates a device and execute a device test.
714        """
715        try:
716            task = self.__discover__(options.__dict__)
717            self.__execute__(task)
718        except (ParamError, ValueError, TypeError, SyntaxError,
719                AttributeError) as exception:
720            error_no = getattr(exception, "error_no", "00000")
721            LOG.exception("%s: %s" % (get_instance_name(exception), exception),
722                          exc_info=False, error_no=error_no)
723            if Scheduler.upload_address:
724                Scheduler.upload_unavailable_result(str(exception.args))
725                Scheduler.upload_report_end()
726        finally:
727            self.stop_task_log()
728            self.stop_encrypt_log()
729
730    @classmethod
731    def _reset_environment(cls, environment="", config_file=""):
732        env_manager = EnvironmentManager()
733        env_manager.env_stop()
734        EnvironmentManager(environment, config_file)
735
736    @classmethod
737    def _restore_environment(cls):
738        env_manager = EnvironmentManager()
739        env_manager.env_stop()
740        EnvironmentManager()
741
742    @classmethod
743    def start_task_log(cls, log_path):
744        tool_file_name = "task_log.log"
745        tool_log_file = os.path.join(log_path, tool_file_name)
746        add_task_file_handler(tool_log_file)
747
748    @classmethod
749    def start_encrypt_log(cls, log_path):
750        from _core.report.encrypt import check_pub_key_exist
751        if check_pub_key_exist():
752            encrypt_file_name = "task_log.ept"
753            encrypt_log_file = os.path.join(log_path, encrypt_file_name)
754            add_encrypt_file_handler(encrypt_log_file)
755
756    @classmethod
757    def stop_task_log(cls):
758        remove_task_file_handler()
759
760    @classmethod
761    def stop_encrypt_log(cls):
762        remove_encrypt_file_handler()
763
764    @staticmethod
765    def _find_test_root_descriptor(config):
766        if getattr(config, ConfigConst.task, None) or \
767                getattr(config, ConfigConst.testargs, None):
768            Scheduler._pre_component_test(config)
769
770        if getattr(config, ConfigConst.subsystems, "") or \
771                getattr(config, ConfigConst.parts, "") \
772                or getattr(config, ConfigConst.component_base_kit, ""):
773            uid = unique_id("Scheduler", "component")
774            if config.subsystems or config.parts:
775                test_set = (config.subsystems, config.parts)
776            else:
777                kit = getattr(config, ConfigConst.component_base_kit)
778                test_set = kit.get_white_list()
779
780            root = Descriptor(uuid=uid, name="component",
781                              source=TestSetSource(test_set),
782                              container=True)
783            root.children = find_test_descriptors(config)
784            return root
785
786        # read test list from testdict
787        if getattr(config, ConfigConst.testdict, "") != "" and getattr(
788                config, ConfigConst.testfile, "") == "":
789            uid = unique_id("Scheduler", "testdict")
790            root = Descriptor(uuid=uid, name="testdict",
791                              source=TestSetSource(config.testdict),
792                              container=True)
793            root.children = find_testdict_descriptors(config)
794            return root
795
796        # read test list from testfile, testlist or task
797        test_set = getattr(config, ConfigConst.testfile, "") or getattr(
798            config, ConfigConst.testlist, "") or getattr(
799            config, ConfigConst.task, "") or getattr(
800            config, ConfigConst.testcase)
801        if test_set:
802            fname, _ = get_filename_extension(test_set)
803            uid = unique_id("Scheduler", fname)
804            root = Descriptor(uuid=uid, name=fname,
805                              source=TestSetSource(test_set), container=True)
806            root.children = find_test_descriptors(config)
807            return root
808        else:
809            raise ParamError("no test file, list, dict, case or task found",
810                             error_no="00102")
811
812    @classmethod
813    def terminate_cmd_exec(cls):
814        Scheduler.is_execute = False
815        LOG.info("start to terminate execution")
816        return Scheduler.terminate_result.get()
817
818    @classmethod
819    def upload_case_result(cls, upload_param):
820        if not Scheduler.upload_address:
821            return
822        case_id, result, error, start_time, end_time, report_path = \
823            upload_param
824        if error and len(error) > 150:
825            error = "%s..." % error[:150]
826        LOG.info(
827            "get upload params: %s, %s, %s, %s, %s, %s" % (
828                case_id, result, error, start_time, end_time, report_path))
829        if Scheduler.proxy:
830            Scheduler.proxy.upload_result(case_id, result, error,
831                                          start_time, end_time, report_path)
832            return
833        from agent.factory import upload_result
834        upload_result(case_id, result, error, start_time, end_time,
835                      report_path)
836
837    @classmethod
838    def upload_module_result(cls, exec_message):
839        if not Scheduler.is_execute:
840            return
841        result_file = exec_message.get_result()
842        request = exec_message.get_request()
843
844        test_name = request.root.source.test_name
845        if not result_file or not os.path.exists(result_file):
846            LOG.error("%s result not exists", test_name, error_no="00200")
847            return
848
849        test_type = request.root.source.test_type
850        LOG.info("need upload result: %s, test type: %s" %
851                 (result_file, test_type))
852        upload_params, _, _ = cls._get_upload_params(result_file, request)
853        if not upload_params:
854            LOG.error("%s no test case result to upload" % result_file,
855                      error_no="00201")
856            return
857        LOG.info("need upload %s case" % len(upload_params))
858        upload_suite = []
859        for upload_param in upload_params:
860            case_id, result, error, start_time, end_time, report_path = \
861                upload_param
862            case = {"caseid": case_id, "result": result, "error": error,
863                    "start": start_time, "end": end_time,
864                    "report": report_path}
865            LOG.info("case info: %s", case)
866            upload_suite.append(case)
867        if Scheduler.proxy:
868            Scheduler.proxy.upload_result(case_id, result, error,
869                                          start_time, end_time, report_path)
870            return
871        from agent.factory import upload_batch
872        upload_batch(upload_suite)
873
874    @classmethod
875    def _get_upload_params(cls, result_file, request):
876        upload_params = []
877
878        report_path = result_file
879        task_log_path = os.path.join(request.config.report_path, "log",
880                                     "task_log.log")
881        testsuites_element = DataHelper.parse_data_report(report_path)
882        start_time, end_time = cls._get_time(testsuites_element)
883
884        for testsuite_element in testsuites_element:
885            if check_mode(ModeType.developer):
886                module_name = str(get_filename_extension(
887                    report_path)[0]).split(".")[0]
888            else:
889                module_name = testsuite_element.get(ReportConstant.name,
890                                                    "none")
891            for case_element in testsuite_element:
892                case_id = cls._get_case_id(case_element, module_name)
893                case_result, error = cls._get_case_result(case_element)
894                if error and len(error) > 150:
895                    error = "%s..." % error[:150]
896                if case_result == "Ignored":
897                    LOG.info("get upload params: %s result is ignored",
898                             case_id)
899                    continue
900                upload_params.append(
901                    (case_id, case_result, error, start_time,
902                     end_time, task_log_path))
903        return upload_params, start_time, end_time
904
905    @classmethod
906    def get_script_result(cls, model_element):
907        disabled = int(model_element.get(ReportConstant.disabled)) if \
908            model_element.get(ReportConstant.disabled, "") else 0
909        failures = int(model_element.get(ReportConstant.failures)) if \
910            model_element.get(ReportConstant.failures, "") else 0
911        errors = int(model_element.get(ReportConstant.errors)) if \
912            model_element.get(ReportConstant.errors, "") else 0
913        unavailable = int(model_element.get(ReportConstant.unavailable)) if \
914            model_element.get(ReportConstant.unavailable, "") else 0
915        if failures > 0 or errors > 0:
916            result = "Failed"
917        elif disabled > 0 or unavailable > 0:
918            result = "Unavailable"
919        else:
920            result = "Passed"
921
922        if result == "Passed":
923            return result, ""
924        if Scheduler.mode == ModeType.decc:
925            result = "Failed"
926
927        error_msg = model_element.get(ReportConstant.message, "")
928        if not error_msg and len(model_element) > 0:
929            error_msg = model_element[0].get(ReportConstant.message, "")
930            if not error_msg and len(model_element[0]) > 0:
931                error_msg = model_element[0][0].get(ReportConstant.message, "")
932        return result, error_msg
933
934    @classmethod
935    def _get_case_id(cls, case_element, package_name):
936        class_name = case_element.get(ReportConstant.class_name, "none")
937        method_name = case_element.get(ReportConstant.name, "none")
938        case_id = "{}#{}#{}#{}".format(Scheduler.task_name, package_name,
939                                       class_name, method_name)
940        return case_id
941
942    @classmethod
943    def _get_case_result(cls, case_element):
944        # get result
945        case = Case()
946        case.status = case_element.get(ReportConstant.status, "")
947        case.result = case_element.get(ReportConstant.result, "")
948        if case_element.get(ReportConstant.message, ""):
949            case.message = case_element.get(ReportConstant.message)
950        if len(case_element) > 0:
951            if not case.result:
952                case.result = ReportConstant.false
953            case.message = case_element[0].get(ReportConstant.message)
954        if case.is_passed():
955            result = "Passed"
956        elif case.is_failed():
957            result = "Failed"
958        elif case.is_blocked():
959            result = "Blocked"
960        elif case.is_ignored():
961            result = "Ignored"
962        else:
963            result = "Unavailable"
964        return result, case.message
965
966    @classmethod
967    def _get_time(cls, testsuite_element):
968        start_time = testsuite_element.get(ReportConstant.start_time, "")
969        end_time = testsuite_element.get(ReportConstant.end_time, "")
970        try:
971            if start_time and end_time:
972                start_time = int(time.mktime(time.strptime(
973                    start_time, ReportConstant.time_format)) * 1000)
974                end_time = int(time.mktime(time.strptime(
975                    end_time, ReportConstant.time_format)) * 1000)
976            else:
977                timestamp = str(testsuite_element.get(
978                    ReportConstant.time_stamp, "")).replace("T", " ")
979                cost_time = testsuite_element.get(ReportConstant.time, "")
980                if timestamp and cost_time:
981                    try:
982                        end_time = int(time.mktime(time.strptime(
983                            timestamp, ReportConstant.time_format)) * 1000)
984                    except ArithmeticError as error:
985                        LOG.error("get time error %s" % error)
986                        end_time = int(time.time() * 1000)
987                    start_time = int(end_time - float(cost_time) * 1000)
988                else:
989                    current_time = int(time.time() * 1000)
990                    start_time, end_time = current_time, current_time
991        except ArithmeticError as error:
992            LOG.error("get time error %s" % error)
993            current_time = int(time.time() * 1000)
994            start_time, end_time = current_time, current_time
995        return start_time, end_time
996
997    @classmethod
998    def upload_task_result(cls, task, error_message=""):
999        if not Scheduler.task_name:
1000            LOG.info("no need upload summary report")
1001            return
1002
1003        summary_data_report = os.path.join(task.config.report_path,
1004                                           ReportConstant.summary_data_report)
1005        summary_vision_report = os.path.join(
1006            task.config.report_path, ReportConstant.summary_vision_report)
1007        if not os.path.exists(summary_data_report):
1008            task_log_path = os.path.join(task.config.report_path, "log",
1009                                         "task_log.log")
1010            if not os.path.exists(task_log_path):
1011                task_log_path = ""
1012            Scheduler.upload_unavailable_result(str(
1013                error_message) or "summary report not exists", task_log_path)
1014            return
1015
1016        task_element = ElementTree.parse(summary_data_report).getroot()
1017        start_time, end_time = cls._get_time(task_element)
1018        task_result = cls._get_task_result(task_element)
1019        if task_result == "Unavailable":
1020            summary_vision_report = os.path.join(task.config.report_path,
1021                                                 "log", "task_log.log")
1022        error_msg = ""
1023        for child in task_element:
1024            if child.get(ReportConstant.message, ""):
1025                error_msg = "{}{}".format(
1026                    error_msg, "%s;" % child.get(ReportConstant.message))
1027        if error_msg:
1028            error_msg = error_msg[:-1]
1029        cls.upload_case_result((Scheduler.task_name, task_result,
1030                                error_msg, start_time, end_time,
1031                                summary_vision_report))
1032
1033    @classmethod
1034    def _get_task_result(cls, task_element):
1035        failures = int(task_element.get(ReportConstant.failures, 0))
1036        errors = int(task_element.get(ReportConstant.errors, 0))
1037        disabled = int(task_element.get(ReportConstant.disabled, 0))
1038        unavailable = int(task_element.get(ReportConstant.unavailable, 0))
1039        if disabled > 0:
1040            task_result = "Blocked"
1041        elif errors > 0 or failures > 0:
1042            task_result = "Failed"
1043        elif unavailable > 0:
1044            task_result = "Unavailable"
1045        else:
1046            task_result = "Passed"
1047        return task_result
1048
1049    @classmethod
1050    def upload_unavailable_result(cls, error_msg, report_path=""):
1051        start_time = int(time.time() * 1000)
1052        Scheduler.upload_case_result((Scheduler.task_name, "Unavailable",
1053                                      error_msg, start_time, start_time,
1054                                      report_path))
1055
1056    @classmethod
1057    def upload_report_end(cls):
1058        LOG.info("Upload report end")
1059        if Scheduler.proxy is not None:
1060            Scheduler.proxy.report_end()
1061            return
1062        from agent.factory import report_end
1063        report_end()
1064
1065    @classmethod
1066    def is_module_need_retry(cls, task, module_name):
1067        failed_flag = False
1068        if check_mode(ModeType.decc):
1069            from xdevice import SuiteReporter
1070            for module, failed in SuiteReporter.get_failed_case_list():
1071                if module_name == module or str(module_name).split(
1072                        ".")[0] == module:
1073                    failed_flag = True
1074                    break
1075        else:
1076            from xdevice import ResultReporter
1077            history_report_path = \
1078                getattr(task.config, ConfigConst.history_report_path, "")
1079            params = ResultReporter.get_task_info_params(history_report_path)
1080            if params and params[3]:
1081                if dict(params[3]).get(module_name, []):
1082                    failed_flag = True
1083                elif dict(params[3]).get(str(module_name).split(".")[0], []):
1084                    failed_flag = True
1085        return failed_flag
1086
1087    @classmethod
1088    def compare_spt_time(cls, kit_spt, device_spt):
1089        if not kit_spt or not device_spt:
1090            return False
1091        try:
1092            kit_time = str(kit_spt).split("-")[:2]
1093            device_time = str(device_spt).split("-")[:2]
1094            k_spt = datetime.datetime.strptime(
1095                "-".join(kit_time), "%Y-%m")
1096            d_spt = datetime.datetime.strptime("-".join(device_time), "%Y-%m")
1097        except ValueError as value_error:
1098            LOG.debug("date format is error, %s" % value_error.args)
1099            return False
1100        month_interval = int(k_spt.month) - int(d_spt.month)
1101        year_interval = int(k_spt.year) - int(d_spt.year)
1102        LOG.debug("kit spt (year=%s, month=%s), device spt (year=%s, month=%s)"
1103                  % (k_spt.year, k_spt.month, d_spt.year, d_spt.month))
1104        if year_interval == 0 and month_interval in (0, 1, 2):
1105            return True
1106        if year_interval == 1 and month_interval + 12 in (1, 2):
1107            return True
1108
1109    @classmethod
1110    def _parse_version_id(cls, driver_request, kit):
1111        test_args = copy.deepcopy(
1112            driver_request.config.get(ConfigConst.testargs, dict()))
1113        version_id = ""
1114        if ConfigConst.pass_through in test_args.keys():
1115            import json
1116            pt_dict = json.loads(test_args.get(ConfigConst.pass_through, ""))
1117            version_id = pt_dict.get("VersionID", None)
1118        elif "VersionID" in test_args.keys():
1119            version_id = test_args.get("VersionID", None)
1120        if version_id:
1121            kit_version = version_id
1122        else:
1123            kit_version = kit.properties.get(ConfigConst.version, None)
1124        return kit_version
1125
1126    @classmethod
1127    def update_test_type_in_source(cls, key, value):
1128        LOG.debug("update test type dict in source")
1129        TestDictSource.test_type[key] = value
1130
1131    @classmethod
1132    def update_ext_type_in_source(cls, key, value):
1133        LOG.debug("update ext type dict in source")
1134        TestDictSource.exe_type[key] = value
1135
1136    @classmethod
1137    def _clear_test_dict_source(cls):
1138        TestDictSource.exe_type.clear()
1139        TestDictSource.test_type.clear()
1140
1141    @classmethod
1142    def _pre_component_test(cls, config):
1143        if not config.kits:
1144            return
1145        cur_kit = None
1146        for kit in config.kits:
1147            if kit.__class__.__name__ == CKit.component:
1148                cur_kit = kit
1149                break
1150        if not cur_kit:
1151            return
1152        get_white_list = getattr(cur_kit, "get_white_list", None)
1153        if not callable(get_white_list):
1154            return
1155        subsystems, parts = get_white_list()
1156        if not subsystems and not parts:
1157            return
1158        setattr(config, ConfigConst.component_base_kit, cur_kit)
1159
1160    @classmethod
1161    def component_task_setup(cls, task, module_name):
1162        component_kit = task.config.get(ConfigConst.component_base_kit, None)
1163        if not component_kit:
1164            # only -p -s .you do not care about the components that can be
1165            # supported. you only want to run the use cases of the current
1166            # component
1167            return
1168        LOG.debug("Start component task setup")
1169        _component_mapper = task.config.get(ConfigConst.component_mapper)
1170        _subsystem, _part = _component_mapper.get(module_name)
1171
1172        is_hit = False
1173        # find in cache. if not find, update cache
1174        cache_subsystem, cache_part = component_kit.get_cache()
1175        if _subsystem in cache_subsystem or _part in cache_subsystem:
1176            is_hit = True
1177        if not is_hit:
1178            env_manager = EnvironmentManager()
1179            for _, manager in env_manager.managers.items():
1180                if getattr(manager, "devices_list", []):
1181                    for device in manager.devices_list:
1182                        component_kit.__setup__(device)
1183            cache_subsystem, cache_part = component_kit.get_cache()
1184            if _subsystem in cache_subsystem or _part in cache_subsystem:
1185                is_hit = True
1186        if not is_hit:
1187            LOG.warning("%s are skipped, no suitable component found. "
1188                        "Require subsystem=%s part=%s, no device match this"
1189                        % (module_name, _subsystem, _part))
1190