• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import copy
20import datetime
21import os
22import queue
23import time
24import uuid
25import shutil
26from xml.etree import ElementTree
27
28from _core.utils import unique_id
29from _core.utils import check_mode
30from _core.utils import get_sub_path
31from _core.utils import get_filename_extension
32from _core.utils import convert_serial
33from _core.utils import get_instance_name
34from _core.utils import is_config_str
35from _core.utils import check_result_report
36from _core.utils import get_cst_time
37from _core.environment.manager_env import EnvironmentManager
38from _core.environment.manager_env import DeviceSelectionOption
39from _core.exception import ParamError
40from _core.exception import ExecuteTerminate
41from _core.exception import LiteDeviceError
42from _core.exception import DeviceError
43from _core.interface import LifeCycle
44from _core.executor.request import Request
45from _core.executor.request import Task
46from _core.executor.request import Descriptor
47from _core.plugin import get_plugin
48from _core.plugin import Plugin
49from _core.plugin import Config
50from _core.report.reporter_helper import ExecInfo
51from _core.report.reporter_helper import ReportConstant
52from _core.report.reporter_helper import Case
53from _core.report.reporter_helper import DataHelper
54from _core.constants import TestExecType
55from _core.constants import CKit
56from _core.constants import ModeType
57from _core.constants import DeviceLabelType
58from _core.constants import SchedulerType
59from _core.constants import ListenerType
60from _core.constants import ConfigConst
61from _core.constants import ReportConst
62from _core.constants import HostDrivenTestType
63from _core.executor.concurrent import DriversThread
64from _core.executor.concurrent import QueueMonitorThread
65from _core.executor.concurrent import DriversDryRunThread
66from _core.executor.source import TestSetSource
67from _core.executor.source import find_test_descriptors
68from _core.executor.source import find_testdict_descriptors
69from _core.executor.source import TestDictSource
70from _core.logger import platform_logger
71from _core.logger import add_task_file_handler
72from _core.logger import remove_task_file_handler
73from _core.logger import add_encrypt_file_handler
74from _core.logger import remove_encrypt_file_handler
75
76__all__ = ["Scheduler"]
77LOG = platform_logger("Scheduler")
78
79MAX_VISIBLE_LENGTH = 150
80
81
82@Plugin(type=Plugin.SCHEDULER, id=SchedulerType.scheduler)
83class Scheduler(object):
84    """
85    The Scheduler is the main entry point for client code that wishes to
86    discover and execute tests.
87    """
88    # factory params
89    is_execute = True
90    terminate_result = queue.Queue()
91    upload_address = ""
92    task_type = ""
93    task_name = ""
94    mode = ""
95    proxy = None
96
97    # command_queue to store test commands
98    command_queue = []
99    max_command_num = 50
100    # the number of tests in current task
101    test_number = 0
102    device_labels = []
103
104    def __discover__(self, args):
105        """Discover task to execute"""
106        config = Config()
107        config.update(args)
108        task = Task(drivers=[])
109        task.init(config)
110
111        TestDictSource.reset()
112        root_descriptor = self._find_test_root_descriptor(task.config)
113        task.set_root_descriptor(root_descriptor)
114        return task
115
116    def __execute__(self, task):
117        error_message = ""
118        try:
119            Scheduler.is_execute = True
120            if Scheduler.command_queue:
121                LOG.debug("Run command: %s" % Scheduler.command_queue[-1])
122                run_command = Scheduler.command_queue.pop()
123                task_id = str(uuid.uuid1()).split("-")[0]
124                Scheduler.command_queue.append((task_id, run_command,
125                                                task.config.report_path))
126                if len(Scheduler.command_queue) > self.max_command_num:
127                    Scheduler.command_queue.pop(0)
128
129            if getattr(task.config, ConfigConst.test_environment, ""):
130                self._reset_environment(task.config.get(
131                    ConfigConst.test_environment, ""))
132            elif getattr(task.config, ConfigConst.configfile, ""):
133                self._reset_environment(config_file=task.config.get(
134                    ConfigConst.configfile, ""))
135
136            # do with the count of repeat about a task
137            if getattr(task.config, ConfigConst.repeat, 0) > 0:
138                drivers_list = list()
139                for repeat_index in range(task.config.repeat):
140                    for driver_index in range(len(task.test_drivers)):
141                        drivers_list.append(
142                            copy.deepcopy(task.test_drivers[driver_index]))
143                task.test_drivers = drivers_list
144
145            self.test_number = len(task.test_drivers)
146
147            if task.config.exectype == TestExecType.device_test:
148                self._device_test_execute(task)
149            elif task.config.exectype == TestExecType.host_test:
150                self._host_test_execute(task)
151            else:
152                LOG.info("Exec type %s is bypassed" % task.config.exectype)
153
154        except (ParamError, ValueError, TypeError, SyntaxError, AttributeError,
155                DeviceError, LiteDeviceError, ExecuteTerminate) as exception:
156            error_no = getattr(exception, "error_no", "")
157            error_message = "%s[%s]" % (str(exception), error_no) \
158                if error_no else str(exception)
159            error_no = error_no if error_no else "00000"
160            LOG.exception(exception, exc_info=False, error_no=error_no)
161
162        finally:
163            Scheduler._clear_test_dict_source()
164            if getattr(task.config, ConfigConst.test_environment, "") or \
165                    getattr(task.config, ConfigConst.configfile, ""):
166                self._restore_environment()
167
168            if Scheduler.upload_address:
169                Scheduler.upload_task_result(task, error_message)
170                Scheduler.upload_report_end()
171
172    def _device_test_execute(self, task):
173        used_devices = {}
174        try:
175            self._dynamic_concurrent_execute(task, used_devices)
176        finally:
177            Scheduler.__reset_environment__(used_devices)
178            # generate reports
179            self._generate_task_report(task, used_devices)
180
181    def _host_test_execute(self, task):
182        """Execute host test"""
183        try:
184            # initial params
185            current_driver_threads = {}
186            test_drivers = task.test_drivers
187            message_queue = queue.Queue()
188
189            # execute test drivers
190            queue_monitor_thread = self._start_queue_monitor(
191                message_queue, test_drivers, current_driver_threads)
192            while test_drivers:
193                if len(current_driver_threads) > 5:
194                    time.sleep(3)
195                    continue
196
197                # clear remaining test drivers when scheduler is terminated
198                if not Scheduler.is_execute:
199                    LOG.info("Clear test drivers")
200                    self._clear_not_executed(task, test_drivers)
201                    break
202
203                # get test driver and device
204                test_driver = test_drivers[0]
205
206                # display executing progress
207                self._display_executing_process(None, test_driver,
208                                                test_drivers)
209
210                # start driver thread
211                self._start_driver_thread(current_driver_threads, (
212                    None, message_queue, task, test_driver))
213                test_drivers.pop(0)
214
215            # wait for all drivers threads finished and do kit teardown
216            while True:
217                if not queue_monitor_thread.is_alive():
218                    break
219                time.sleep(3)
220
221        finally:
222            # generate reports
223            self._generate_task_report(task)
224
225    def _dry_run_device_test_execute(self, task):
226        try:
227            # initial params
228            used_devices = {}
229            current_driver_threads = {}
230            test_drivers = task.test_drivers
231            message_queue = queue.Queue()
232            task_unused_env = []
233
234            # execute test drivers
235            queue_monitor_thread = self._start_queue_monitor(
236                message_queue, test_drivers, current_driver_threads)
237            while test_drivers:
238                # clear remaining test drivers when scheduler is terminated
239                if not Scheduler.is_execute:
240                    LOG.info("Clear test drivers")
241                    self._clear_not_executed(task, test_drivers)
242                    break
243
244                # get test driver and device
245                test_driver = test_drivers[0]
246                # get environment
247                try:
248                    environment = self.__allocate_environment__(
249                        task.config.__dict__, test_driver)
250                except DeviceError as exception:
251                    self._handle_device_error(exception, task, test_drivers)
252                    continue
253
254                if not Scheduler.is_execute:
255                    if environment:
256                        Scheduler.__free_environment__(environment)
257                    continue
258
259                # start driver thread
260                thread_id = self._get_thread_id(current_driver_threads)
261                driver_thread = DriversDryRunThread(test_driver, task, environment,
262                                              message_queue)
263                driver_thread.setDaemon(True)
264                driver_thread.set_thread_id(thread_id)
265                driver_thread.start()
266                current_driver_threads.setdefault(thread_id, driver_thread)
267
268                test_drivers.pop(0)
269
270            # wait for all drivers threads finished and do kit teardown
271            while True:
272                if not queue_monitor_thread.is_alive():
273                    break
274                time.sleep(3)
275
276            self._do_taskkit_teardown(used_devices, task_unused_env)
277        finally:
278            LOG.debug("Removing report_path: {}".format(task.config.report_path))
279            # delete reports
280            self.stop_task_logcat()
281            self.stop_encrypt_log()
282            shutil.rmtree(task.config.report_path)
283
284    def _generate_task_report(self, task, used_devices=None):
285        task_info = ExecInfo()
286        test_type = getattr(task.config, "testtype", [])
287        task_name = getattr(task.config, "task", "")
288        if task_name:
289            task_info.test_type = str(task_name).upper()
290        else:
291            task_info.test_type = ",".join(test_type) if test_type else "Test"
292        if used_devices:
293            serials = []
294            platforms = []
295            for serial, device in used_devices.items():
296                serials.append(convert_serial(serial))
297                platform = str(device.label).capitalize()
298                if platform not in platforms:
299                    platforms.append(platform)
300            task_info.device_name = ",".join(serials)
301            task_info.platform = ",".join(platforms)
302        else:
303            task_info.device_name = "None"
304            task_info.platform = "None"
305        task_info.test_time = task.config.start_time
306        task_info.product_info = getattr(task, "product_info", "")
307
308        listeners = self._create_listeners(task)
309        for listener in listeners:
310            listener.__ended__(LifeCycle.TestTask, task_info,
311                               test_type=task_info.test_type)
312
313    @classmethod
314    def _create_listeners(cls, task):
315        listeners = []
316        # append log listeners
317        log_listeners = get_plugin(Plugin.LISTENER, ListenerType.log)
318        for log_listener in log_listeners:
319            log_listener_instance = log_listener.__class__()
320            listeners.append(log_listener_instance)
321        # append report listeners
322        report_listeners = get_plugin(Plugin.LISTENER, ListenerType.report)
323        for report_listener in report_listeners:
324            report_listener_instance = report_listener.__class__()
325            setattr(report_listener_instance, "report_path",
326                    task.config.report_path)
327            listeners.append(report_listener_instance)
328        # append upload listeners
329        upload_listeners = get_plugin(Plugin.LISTENER, ListenerType.upload)
330        for upload_listener in upload_listeners:
331            upload_listener_instance = upload_listener.__class__()
332            listeners.append(upload_listener_instance)
333        return listeners
334
335    @staticmethod
336    def _find_device_options(environment_config, options, test_source):
337        devices_option = []
338        index = 1
339        for device_dict in environment_config:
340            label = device_dict.get("label", "")
341            required_manager = device_dict.get("type", "device")
342            required_manager = \
343                required_manager if required_manager else "device"
344            if not label:
345                continue
346            device_option = DeviceSelectionOption(options, label, test_source)
347            device_dict.pop("type", None)
348            device_dict.pop("label", None)
349            device_option.required_manager = required_manager
350            device_option.extend_value = device_dict
351            device_option.source_file = \
352                test_source.config_file or test_source.source_string
353            if hasattr(device_option, "env_index"):
354                device_option.env_index = index
355            index += 1
356            devices_option.append(device_option)
357        return devices_option
358
359    def __allocate_environment__(self, options, test_driver):
360        device_options = self.get_device_options(options,
361                                                 test_driver[1].source)
362        environment = None
363        env_manager = EnvironmentManager()
364        while True:
365            if not Scheduler.is_execute:
366                break
367            environment = env_manager.apply_environment(device_options)
368            if len(environment.devices) == len(device_options):
369                return environment
370            else:
371                env_manager.release_environment(environment)
372                LOG.debug("'%s' is waiting available device",
373                          test_driver[1].source.test_name)
374                if env_manager.check_device_exist(device_options):
375                    continue
376                else:
377                    LOG.debug("'%s' required %s devices, actually %s devices"
378                              " were found" % (test_driver[1].source.test_name,
379                                               len(device_options),
380                                               len(environment.devices)))
381                    raise DeviceError("The '%s' required device does not exist"
382                                      % test_driver[1].source.source_file,
383                                      error_no="00104")
384
385        return environment
386
387    @classmethod
388    def get_device_options(cls, options, test_source):
389        device_options = []
390        config_file = test_source.config_file
391        environment_config = []
392        from _core.testkit.json_parser import JsonParser
393        if test_source.source_string and is_config_str(
394                test_source.source_string):
395            json_config = JsonParser(test_source.source_string)
396            environment_config = json_config.get_environment()
397            device_options = cls._find_device_options(
398                environment_config, options, test_source)
399        elif config_file and os.path.exists(config_file):
400            json_config = JsonParser(test_source.config_file)
401            environment_config = json_config.get_environment()
402            device_options = cls._find_device_options(
403                environment_config, options, test_source)
404
405        device_options = cls._calculate_device_options(
406            device_options, environment_config, options, test_source)
407
408        if ConfigConst.component_mapper in options.keys():
409            required_component = options.get(ConfigConst.component_mapper). \
410                get(test_source.module_name, None)
411            for device_option in device_options:
412                device_option.required_component = required_component
413        return device_options
414
415    @staticmethod
416    def __free_environment__(environment):
417        env_manager = EnvironmentManager()
418        env_manager.release_environment(environment)
419
420    @staticmethod
421    def __reset_environment__(used_devices):
422        env_manager = EnvironmentManager()
423        env_manager.reset_environment(used_devices)
424
425    @classmethod
426    def _check_device_spt(cls, kit, driver_request, device):
427        kit_spt = cls._parse_property_value(ConfigConst.spt,
428                                            driver_request, kit)
429        if not kit_spt:
430            setattr(device, ConfigConst.task_state, False)
431            LOG.error("Spt is empty", error_no="00108")
432            return
433        if getattr(driver_request, ConfigConst.product_info, ""):
434            product_info = getattr(driver_request,
435                                   ConfigConst.product_info)
436            if not isinstance(product_info, dict):
437                LOG.warning("Product info should be dict, %s",
438                            product_info)
439                setattr(device, ConfigConst.task_state, False)
440                return
441            device_spt = product_info.get("Security Patch", None)
442            if not device_spt or not \
443                    Scheduler.compare_spt_time(kit_spt, device_spt):
444                LOG.error("The device %s spt is %s, "
445                          "and the test case spt is %s, "
446                          "which does not meet the requirements" %
447                          (device.device_sn, device_spt, kit_spt),
448                          error_no="00116")
449                setattr(device, ConfigConst.task_state, False)
450                return
451
452    def _decc_task_setup(self, environment, task):
453        config = Config()
454        config.update(task.config.__dict__)
455        config.environment = environment
456        driver_request = Request(config=config)
457
458        if environment is None:
459            return False
460
461        for device in environment.devices:
462            if not getattr(device, ConfigConst.need_kit_setup, True):
463                LOG.debug("Device %s need kit setup is false" % device)
464                continue
465
466            # do task setup for device
467            kits_copy = copy.deepcopy(task.config.kits)
468            setattr(device, ConfigConst.task_kits, kits_copy)
469            for kit in getattr(device, ConfigConst.task_kits, []):
470                if not Scheduler.is_execute:
471                    break
472                try:
473                    kit.__setup__(device, request=driver_request)
474                except (ParamError, ExecuteTerminate, DeviceError,
475                        LiteDeviceError, ValueError, TypeError,
476                        SyntaxError, AttributeError) as exception:
477                    error_no = getattr(exception, "error_no", "00000")
478                    LOG.exception(
479                        "Task setup device: %s, exception: %s" % (
480                            environment.__get_serial__(),
481                            exception), exc_info=False, error_no=error_no)
482                if kit.__class__.__name__ == CKit.query and \
483                        device.label in [DeviceLabelType.ipcamera]:
484                    self._check_device_spt(kit, driver_request, device)
485            LOG.debug("Set device %s need kit setup to false" % device)
486            setattr(device, ConfigConst.need_kit_setup, False)
487
488        for device in environment.devices:
489            if not getattr(device, ConfigConst.task_state, True):
490                return False
491
492        # set product_info to self.task
493        if getattr(driver_request, ConfigConst.product_info, "") and \
494                not getattr(task, ConfigConst.product_info, ""):
495            product_info = getattr(driver_request, ConfigConst.product_info)
496            if not isinstance(product_info, dict):
497                LOG.warning("Product info should be dict, %s",
498                            product_info)
499            else:
500                setattr(task, ConfigConst.product_info, product_info)
501        return True
502
503    def _dynamic_concurrent_execute(self, task, used_devices):
504        # initial params
505        current_driver_threads = {}
506        test_drivers = task.test_drivers
507        message_queue = queue.Queue()
508        task_unused_env = []
509
510        # execute test drivers
511        queue_monitor_thread = self._start_queue_monitor(
512            message_queue, test_drivers, current_driver_threads)
513        while test_drivers:
514            # clear remaining test drivers when scheduler is terminated
515            if not Scheduler.is_execute:
516                LOG.info("Clear test drivers")
517                self._clear_not_executed(task, test_drivers)
518                break
519
520            # get test driver and device
521            test_driver = test_drivers[0]
522
523            if getattr(task.config, ConfigConst.history_report_path, ""):
524                module_name = test_driver[1].source.module_name
525                if not self.is_module_need_retry(task, module_name):
526                    self._display_executing_process(None, test_driver,
527                                                    test_drivers)
528                    LOG.info("%s are passed, no need to retry" % module_name)
529                    self._append_history_result(task, module_name)
530                    LOG.info("")
531                    test_drivers.pop(0)
532                    continue
533
534            if getattr(task.config, ConfigConst.component_mapper, ""):
535                module_name = test_driver[1].source.module_name
536                self.component_task_setup(task, module_name)
537
538            # get environment
539            try:
540                environment = self.__allocate_environment__(
541                    task.config.__dict__, test_driver)
542            except DeviceError as exception:
543                self._handle_device_error(exception, task, test_drivers)
544                continue
545
546            if not Scheduler.is_execute:
547                if environment:
548                    Scheduler.__free_environment__(environment)
549                continue
550
551            if check_mode(ModeType.decc) or getattr(
552                    task.config, ConfigConst.check_device, False):
553                LOG.info("Start to check environment: %s" %
554                         environment.__get_serial__())
555                status = self._decc_task_setup(environment, task)
556                if not status:
557                    Scheduler.__free_environment__(environment)
558                    task_unused_env.append(environment)
559                    error_message = "Load Error[00116]"
560                    self.report_not_executed(task.config.report_path,
561                                             [test_drivers[0]],
562                                             error_message, task)
563                    test_drivers.pop(0)
564                    continue
565                else:
566                    LOG.info("Environment %s check success",
567                             environment.__get_serial__())
568
569            # display executing progress
570            self._display_executing_process(environment, test_driver,
571                                            test_drivers)
572
573            # add to used devices and set need_kit_setup attribute
574            self._append_used_devices(environment, used_devices)
575
576            # start driver thread
577            self._start_driver_thread(current_driver_threads, (
578                environment, message_queue, task, test_driver))
579            test_drivers.pop(0)
580
581        # wait for all drivers threads finished and do kit teardown
582        while True:
583            if not queue_monitor_thread.is_alive():
584                break
585            time.sleep(3)
586
587        self._do_taskkit_teardown(used_devices, task_unused_env)
588
589    @classmethod
590    def _append_history_result(cls, task, module_name):
591        history_report_path = getattr(
592            task.config, ConfigConst.history_report_path, "")
593        from _core.report.result_reporter import ResultReporter
594        params = ResultReporter.get_task_info_params(
595            history_report_path)
596
597        if not params or not params[ReportConst.data_reports]:
598            LOG.debug("Task info record data reports is empty")
599            return
600
601        report_data_dict = dict(params[ReportConst.data_reports])
602        if module_name not in report_data_dict.keys():
603            module_name_ = str(module_name).split(".")[0]
604            if module_name_ not in report_data_dict.keys():
605                LOG.error("%s not in data reports" % module_name)
606                return
607            module_name = module_name_
608
609        from xdevice import SuiteReporter
610        if check_mode(ModeType.decc):
611            virtual_report_path, report_result = SuiteReporter. \
612                get_history_result_by_module(module_name)
613            LOG.debug("Append history result: (%s, %s)" % (
614                virtual_report_path, report_result))
615            SuiteReporter.append_report_result(
616                (virtual_report_path, report_result))
617        else:
618            history_execute_result = report_data_dict.get(module_name, "")
619            LOG.info("Start copy %s" % history_execute_result)
620            file_name = get_filename_extension(history_execute_result)[0]
621            if os.path.exists(history_execute_result):
622                result_dir = \
623                    os.path.join(task.config.report_path, "result")
624                os.makedirs(result_dir, exist_ok=True)
625                target_execute_result = "%s.xml" % os.path.join(
626                    task.config.report_path, "result", file_name)
627                shutil.copyfile(history_execute_result, target_execute_result)
628                LOG.info("Copy %s to %s" % (
629                    history_execute_result, target_execute_result))
630            else:
631                error_msg = "Copy failed! %s not exists!" % \
632                            history_execute_result
633                raise ParamError(error_msg)
634
635    def _handle_device_error(self, exception, task, test_drivers):
636        self._display_executing_process(None, test_drivers[0], test_drivers)
637        error_message = "%s: %s" % \
638                        (get_instance_name(exception), exception)
639        LOG.exception(error_message, exc_info=False,
640                      error_no=exception.error_no)
641        if check_mode(ModeType.decc):
642            error_message = "Load Error[00104]"
643        self.report_not_executed(task.config.report_path, [test_drivers[0]],
644                                 error_message, task)
645
646        LOG.info("")
647        test_drivers.pop(0)
648
649    @classmethod
650    def _clear_not_executed(cls, task, test_drivers):
651        if Scheduler.mode != ModeType.decc:
652            # clear all
653            test_drivers.clear()
654            return
655        # The result is reported only in DECC mode, and also clear all.
656        LOG.error("Case no run: task execution terminated!", error_no="00300")
657        error_message = "Execute Terminate[00300]"
658        cls.report_not_executed(task.config.report_path, test_drivers,
659                                error_message)
660        test_drivers.clear()
661
662    @classmethod
663    def report_not_executed(cls, report_path, test_drivers, error_message,
664                            task=None):
665        # traversing list to get remained elements
666        for test_driver in test_drivers:
667            # get report file
668            if task and getattr(task.config, "testdict", ""):
669                report_file = os.path.join(get_sub_path(
670                    test_driver[1].source.source_file),
671                    "%s.xml" % test_driver[1].source.test_name)
672            else:
673                report_file = os.path.join(
674                    report_path, "result",
675                    "%s.xml" % test_driver[1].source.module_name)
676
677            # get report name
678            report_name = test_driver[1].source.test_name if \
679                not test_driver[1].source.test_name.startswith("{") \
680                else "report"
681
682            # get module name
683            module_name = test_driver[1].source.module_name
684
685            # here, normally create empty report and then upload result
686            check_result_report(report_path, report_file, error_message,
687                                report_name, module_name)
688
689    def _start_driver_thread(self, current_driver_threads, thread_params):
690        environment, message_queue, task, test_driver = thread_params
691        thread_id = self._get_thread_id(current_driver_threads)
692        driver_thread = DriversThread(test_driver, task, environment,
693                                      message_queue)
694        driver_thread.setDaemon(True)
695        driver_thread.set_thread_id(thread_id)
696        driver_thread.set_listeners(self._create_listeners(task))
697        driver_thread.start()
698        current_driver_threads.setdefault(thread_id, driver_thread)
699
700    @classmethod
701    def _do_taskkit_teardown(cls, used_devices, task_unused_env):
702        for device in used_devices.values():
703            if getattr(device, ConfigConst.need_kit_setup, True):
704                continue
705
706            for kit in getattr(device, ConfigConst.task_kits, []):
707                try:
708                    kit.__teardown__(device)
709                except Exception as error:
710                    LOG.debug("Do task kit teardown: %s" % error)
711            setattr(device, ConfigConst.task_kits, [])
712            setattr(device, ConfigConst.need_kit_setup, True)
713
714        for environment in task_unused_env:
715            for device in environment.devices:
716                setattr(device, ConfigConst.task_state, True)
717                setattr(device, ConfigConst.need_kit_setup, True)
718
719    def _display_executing_process(self, environment, test_driver,
720                                   test_drivers):
721        source_content = test_driver[1].source.source_file or \
722                         test_driver[1].source.source_string
723        if environment is None:
724            LOG.info("[%d / %d] Executing: %s, Driver: %s" %
725                     (self.test_number - len(test_drivers) + 1,
726                      self.test_number, source_content,
727                      test_driver[1].source.test_type))
728            return
729
730        LOG.info("[%d / %d] Executing: %s, Device: %s, Driver: %s" %
731                 (self.test_number - len(test_drivers) + 1,
732                  self.test_number, source_content,
733                  environment.__get_serial__(),
734                  test_driver[1].source.test_type))
735
736    @classmethod
737    def _get_thread_id(cls, current_driver_threads):
738        thread_id = get_cst_time().strftime(
739            '%Y-%m-%d-%H-%M-%S-%f')
740        while thread_id in current_driver_threads.keys():
741            thread_id = get_cst_time().strftime(
742                '%Y-%m-%d-%H-%M-%S-%f')
743        return thread_id
744
745    @classmethod
746    def _append_used_devices(cls, environment, used_devices):
747        if environment is not None:
748            for device in environment.devices:
749                device_serial = device.__get_serial__() if device else "None"
750                if device_serial and device_serial not in used_devices.keys():
751                    used_devices[device_serial] = device
752
753    @staticmethod
754    def _start_queue_monitor(message_queue, test_drivers,
755                             current_driver_threads):
756        queue_monitor_thread = QueueMonitorThread(message_queue,
757                                                  current_driver_threads,
758                                                  test_drivers)
759        queue_monitor_thread.setDaemon(True)
760        queue_monitor_thread.start()
761        return queue_monitor_thread
762
763    def exec_command(self, command, options):
764        """
765        Directly executes a command without adding it to the command queue.
766        """
767        if command != "run":
768            raise ParamError("unsupported command action: %s" % command,
769                             error_no="00100")
770        exec_type = options.exectype
771        if exec_type in [TestExecType.device_test, TestExecType.host_test,
772                         TestExecType.host_driven_test]:
773            self._exec_task(options)
774        else:
775            LOG.error("Unsupported execution type '%s'" % exec_type,
776                      error_no="00100")
777
778        return
779
780    def _exec_task(self, options):
781        """
782        Directly allocates a device and execute a device test.
783        """
784        try:
785            task = self.__discover__(options.__dict__)
786            self.__execute__(task)
787        except (ParamError, ValueError, TypeError, SyntaxError,
788                AttributeError) as exception:
789            error_no = getattr(exception, "error_no", "00000")
790            LOG.exception("%s: %s" % (get_instance_name(exception), exception),
791                          exc_info=False, error_no=error_no)
792            if Scheduler.upload_address:
793                Scheduler.upload_unavailable_result(str(exception.args))
794                Scheduler.upload_report_end()
795        finally:
796            self.stop_task_logcat()
797            self.stop_encrypt_log()
798
799    @classmethod
800    def _reset_environment(cls, environment="", config_file=""):
801        env_manager = EnvironmentManager()
802        env_manager.env_stop()
803        EnvironmentManager(environment, config_file)
804
805    @classmethod
806    def _restore_environment(cls):
807        env_manager = EnvironmentManager()
808        env_manager.env_stop()
809        EnvironmentManager()
810
811    @classmethod
812    def start_task_log(cls, log_path):
813        tool_file_name = "task_log.log"
814        tool_log_file = os.path.join(log_path, tool_file_name)
815        add_task_file_handler(tool_log_file)
816
817    @classmethod
818    def start_encrypt_log(cls, log_path):
819        from _core.report.encrypt import check_pub_key_exist
820        if check_pub_key_exist():
821            encrypt_file_name = "task_log.ept"
822            encrypt_log_file = os.path.join(log_path, encrypt_file_name)
823            add_encrypt_file_handler(encrypt_log_file)
824
825    @classmethod
826    def stop_task_logcat(cls):
827        remove_task_file_handler()
828
829    @classmethod
830    def stop_encrypt_log(cls):
831        remove_encrypt_file_handler()
832
833    @staticmethod
834    def _find_test_root_descriptor(config):
835        if getattr(config, ConfigConst.task, None) or \
836                getattr(config, ConfigConst.testargs, None):
837            Scheduler._pre_component_test(config)
838
839        if getattr(config, ConfigConst.subsystems, "") or \
840                getattr(config, ConfigConst.parts, "") or \
841                getattr(config, ConfigConst.component_base_kit, ""):
842            uid = unique_id("Scheduler", "component")
843            if config.subsystems or config.parts:
844                test_set = (config.subsystems, config.parts)
845            else:
846                kit = getattr(config, ConfigConst.component_base_kit)
847                test_set = kit.get_white_list()
848
849            root = Descriptor(uuid=uid, name="component",
850                              source=TestSetSource(test_set),
851                              container=True)
852
853            root.children = find_test_descriptors(config)
854            return root
855            # read test list from testdict
856        if getattr(config, ConfigConst.testdict, "") != "" and getattr(
857                config, ConfigConst.testfile, "") == "":
858            uid = unique_id("Scheduler", "testdict")
859            root = Descriptor(uuid=uid, name="testdict",
860                              source=TestSetSource(config.testdict),
861                              container=True)
862            root.children = find_testdict_descriptors(config)
863            return root
864
865            # read test list from testfile, testlist or task
866        test_set = getattr(config, ConfigConst.testfile, "") or getattr(
867            config, ConfigConst.testlist, "") or getattr(
868            config, ConfigConst.task, "") or getattr(
869            config, ConfigConst.testcase)
870        # read test list from testfile, testlist or task
871        test_set = getattr(config, "testfile", "") or getattr(
872            config, "testlist", "") or getattr(config, "task", "") or getattr(
873            config, "testcase")
874        if test_set:
875            fname, _ = get_filename_extension(test_set)
876            uid = unique_id("Scheduler", fname)
877            root = Descriptor(uuid=uid, name=fname,
878                              source=TestSetSource(test_set), container=True)
879            root.children = find_test_descriptors(config)
880            return root
881        else:
882            raise ParamError("no test file, list, dict, case or task found",
883                             error_no="00102")
884
885    @classmethod
886    def terminate_cmd_exec(cls):
887        Scheduler.is_execute = False
888        LOG.info("Start to terminate execution")
889        return Scheduler.terminate_result.get()
890
891    @classmethod
892    def upload_case_result(cls, upload_param):
893        if not Scheduler.upload_address:
894            return
895        case_id, result, error, start_time, end_time, report_path = \
896            upload_param
897        if error and len(error) > MAX_VISIBLE_LENGTH:
898            error = "%s..." % error[:MAX_VISIBLE_LENGTH]
899        LOG.info(
900            "Get upload params: %s, %s, %s, %s, %s, %s" % (
901                case_id, result, error, start_time, end_time, report_path))
902        if Scheduler.proxy is not None:
903            Scheduler.proxy.upload_result(case_id, result, error, start_time,
904                                          end_time, report_path)
905        else:
906            LOG.debug("There is no proxy, can't upload case result")
907
908    @classmethod
909    def upload_module_result(cls, exec_message):
910        if not Scheduler.is_execute:
911            return
912        result_file = exec_message.get_result()
913        request = exec_message.get_request()
914
915        test_name = request.root.source.test_name
916        if not result_file or not os.path.exists(result_file):
917            LOG.error("%s result not exists", test_name, error_no="00200")
918            return
919
920        test_type = request.root.source.test_type
921        LOG.info("Need upload result: %s, test type: %s" %
922                 (result_file, test_type))
923        upload_params, _, _ = cls._get_upload_params(result_file, request)
924        if not upload_params:
925            LOG.error("%s no test case result to upload" % result_file,
926                      error_no="00201")
927            return
928        LOG.info("Need upload %s case" % len(upload_params))
929        upload_suite = []
930        for upload_param in upload_params:
931            case_id, result, error, start_time, end_time, report_path = \
932                upload_param
933            case = {"caseid": case_id, "result": result, "error": error,
934                    "start": start_time, "end": end_time,
935                    "report": report_path}
936            LOG.info("Case info: %s", case)
937            upload_suite.append(case)
938        if Scheduler.proxy is not None:
939            Scheduler.proxy.upload_batch(upload_suite)
940        else:
941            LOG.debug("There is no proxy, can't upload module result")
942
943    @classmethod
944    def _get_upload_params(cls, result_file, request):
945        upload_params = []
946        report_path = result_file
947        testsuites_element = DataHelper.parse_data_report(report_path)
948        start_time, end_time = cls._get_time(testsuites_element)
949        if request.get_test_type() == HostDrivenTestType.device_test:
950            for model_element in testsuites_element:
951                case_id = model_element.get(ReportConstant.name, "")
952                case_result, error = cls.get_script_result(model_element)
953                if error and len(error) > MAX_VISIBLE_LENGTH:
954                    error = "$s..." % error[:MAX_VISIBLE_LENGTH]
955                upload_params.append(
956                    (case_id, case_result, error, start_time,
957                     end_time, request.config.report_path,))
958        else:
959            for testsuite_element in testsuites_element:
960                if check_mode(ModeType.developer):
961                    module_name = str(get_filename_extension(
962                        report_path)[0]).split(".")[0]
963                else:
964                    module_name = testsuite_element.get(ReportConstant.name,
965                                                        "none")
966                for case_element in testsuite_element:
967                    case_id = cls._get_case_id(case_element, module_name)
968                    case_result, error = cls._get_case_result(case_element)
969                    if error and len(error) > MAX_VISIBLE_LENGTH:
970                        error = "%s..." % error[:MAX_VISIBLE_LENGTH]
971                    if case_result == "Ignored":
972                        LOG.info("Get upload params: %s result is ignored",
973                                 case_id)
974                        continue
975                    upload_params.append(
976                        (case_id, case_result, error, start_time,
977                         end_time, request.config.report_path,))
978        return upload_params, start_time, end_time
979
980    @classmethod
981    def get_script_result(cls, model_element):
982        disabled = int(model_element.get(ReportConstant.disabled)) if \
983            model_element.get(ReportConstant.disabled, "") else 0
984        failures = int(model_element.get(ReportConstant.failures)) if \
985            model_element.get(ReportConstant.failures, "") else 0
986        errors = int(model_element.get(ReportConstant.errors)) if \
987            model_element.get(ReportConstant.errors, "") else 0
988        unavailable = int(model_element.get(ReportConstant.unavailable)) if \
989            model_element.get(ReportConstant.unavailable, "") else 0
990        if failures > 0 or errors > 0:
991            result = "Failed"
992        elif disabled > 0 or unavailable > 0:
993            result = "Unavailable"
994        else:
995            result = "Passed"
996
997        if result == "Passed":
998            return result, ""
999        if Scheduler.mode == ModeType.decc:
1000            result = "Failed"
1001
1002        error_msg = model_element.get(ReportConstant.message, "")
1003        if not error_msg and len(model_element) > 0:
1004            error_msg = model_element[0].get(ReportConstant.message, "")
1005            if not error_msg and len(model_element[0]) > 0:
1006                error_msg = model_element[0][0].get(ReportConstant.message, "")
1007        return result, error_msg
1008
1009    @classmethod
1010    def _get_case_id(cls, case_element, package_name):
1011        class_name = case_element.get(ReportConstant.class_name, "none")
1012        method_name = case_element.get(ReportConstant.name, "none")
1013        case_id = "{}#{}#{}#{}".format(Scheduler.task_name, package_name,
1014                                       class_name, method_name)
1015        return case_id
1016
1017    @classmethod
1018    def _get_case_result(cls, case_element):
1019        # get result
1020        case = Case()
1021        case.status = case_element.get(ReportConstant.status, "")
1022        case.result = case_element.get(ReportConstant.result, "")
1023        if case_element.get(ReportConstant.message, ""):
1024            case.message = case_element.get(ReportConstant.message)
1025        if len(case_element) > 0:
1026            if not case.result:
1027                case.result = ReportConstant.false
1028            case.message = case_element[0].get(ReportConstant.message)
1029        if case.is_passed():
1030            result = "Passed"
1031        elif case.is_failed():
1032            result = "Failed"
1033        elif case.is_blocked():
1034            result = "Blocked"
1035        elif case.is_ignored():
1036            result = "Ignored"
1037        else:
1038            result = "Unavailable"
1039        return result, case.message
1040
1041    @classmethod
1042    def _get_time(cls, testsuite_element):
1043        start_time = testsuite_element.get(ReportConstant.start_time, "")
1044        end_time = testsuite_element.get(ReportConstant.end_time, "")
1045        try:
1046            if start_time and end_time:
1047                start_time = int(time.mktime(time.strptime(
1048                    start_time, ReportConstant.time_format)) * 1000)
1049                end_time = int(time.mktime(time.strptime(
1050                    end_time, ReportConstant.time_format)) * 1000)
1051            else:
1052                timestamp = str(testsuite_element.get(
1053                    ReportConstant.time_stamp, "")).replace("T", " ")
1054                cost_time = testsuite_element.get(ReportConstant.time, "")
1055                if timestamp and cost_time:
1056                    try:
1057                        end_time = int(time.mktime(time.strptime(
1058                            timestamp, ReportConstant.time_format)) * 1000)
1059                    except ArithmeticError as error:
1060                        LOG.error("Get time error %s" % error)
1061                        end_time = int(time.time() * 1000)
1062                    start_time = int(end_time - float(cost_time) * 1000)
1063                else:
1064                    current_time = int(time.time() * 1000)
1065                    start_time, end_time = current_time, current_time
1066        except ArithmeticError as error:
1067            LOG.error("Get time error %s" % error)
1068            current_time = int(time.time() * 1000)
1069            start_time, end_time = current_time, current_time
1070        return start_time, end_time
1071
1072    @classmethod
1073    def upload_task_result(cls, task, error_message=""):
1074        if not Scheduler.task_name:
1075            LOG.info("No need upload summary report")
1076            return
1077
1078        summary_data_report = os.path.join(task.config.report_path,
1079                                           ReportConstant.summary_data_report)
1080        if not os.path.exists(summary_data_report):
1081            Scheduler.upload_unavailable_result(str(
1082                error_message) or "summary report not exists",
1083                                                task.config.report_path)
1084            return
1085
1086        task_element = ElementTree.parse(summary_data_report).getroot()
1087        start_time, end_time = cls._get_time(task_element)
1088        task_result = cls._get_task_result(task_element)
1089        error_msg = ""
1090        for child in task_element:
1091            if child.get(ReportConstant.message, ""):
1092                error_msg = "{}{}".format(
1093                    error_msg, "%s;" % child.get(ReportConstant.message))
1094        if error_msg:
1095            error_msg = error_msg[:-1]
1096        cls.upload_case_result((Scheduler.task_name, task_result,
1097                                error_msg, start_time, end_time,
1098                                task.config.report_path))
1099
1100    @classmethod
1101    def _get_task_result(cls, task_element):
1102        failures = int(task_element.get(ReportConstant.failures, 0))
1103        errors = int(task_element.get(ReportConstant.errors, 0))
1104        disabled = int(task_element.get(ReportConstant.disabled, 0))
1105        unavailable = int(task_element.get(ReportConstant.unavailable, 0))
1106        if disabled > 0:
1107            task_result = "Blocked"
1108        elif errors > 0 or failures > 0:
1109            task_result = "Failed"
1110        elif unavailable > 0:
1111            task_result = "Unavailable"
1112        else:
1113            task_result = "Passed"
1114        return task_result
1115
1116    @classmethod
1117    def upload_unavailable_result(cls, error_msg, report_path=""):
1118        start_time = int(time.time() * 1000)
1119        Scheduler.upload_case_result((Scheduler.task_name, "Unavailable",
1120                                      error_msg, start_time, start_time,
1121                                      report_path))
1122
1123    @classmethod
1124    def upload_report_end(cls):
1125        if getattr(cls, "tmp_json", None):
1126            os.remove(cls.tmp_json)
1127            del cls.tmp_json
1128        LOG.info("Upload report end")
1129        if Scheduler.proxy is not None:
1130            Scheduler.proxy.report_end()
1131        else:
1132            LOG.debug("There is no proxy, can't upload report end")
1133
1134    @classmethod
1135    def is_module_need_retry(cls, task, module_name):
1136        failed_flag = False
1137        if check_mode(ModeType.decc):
1138            from xdevice import SuiteReporter
1139            for module, failed in SuiteReporter.get_failed_case_list():
1140                if module_name == module or str(module_name).split(
1141                        ".")[0] == module:
1142                    failed_flag = True
1143                    break
1144        else:
1145            from xdevice import ResultReporter
1146            history_report_path = \
1147                getattr(task.config, ConfigConst.history_report_path, "")
1148            params = ResultReporter.get_task_info_params(history_report_path)
1149            if params and params[ReportConst.unsuccessful_params]:
1150                if dict(params[ReportConst.unsuccessful_params]).get(module_name, []):
1151                    failed_flag = True
1152                elif dict(params[ReportConst.unsuccessful_params]).get(str(module_name).split(".")[0], []):
1153                    failed_flag = True
1154        return failed_flag
1155
1156    @classmethod
1157    def compare_spt_time(cls, kit_spt, device_spt):
1158        if not kit_spt or not device_spt:
1159            return False
1160        try:
1161            kit_time = str(kit_spt).split("-")[:2]
1162            device_time = str(device_spt).split("-")[:2]
1163            k_spt = datetime.datetime.strptime(
1164                "-".join(kit_time), "%Y-%m")
1165            d_spt = datetime.datetime.strptime("-".join(device_time), "%Y-%m")
1166        except ValueError as value_error:
1167            LOG.debug("Date format is error, %s" % value_error.args)
1168            return False
1169        month_interval = int(k_spt.month) - int(d_spt.month)
1170        year_interval = int(k_spt.year) - int(d_spt.year)
1171        LOG.debug("Kit spt (year=%s, month=%s), device spt (year=%s, month=%s)"
1172                  % (k_spt.year, k_spt.month, d_spt.year, d_spt.month))
1173        if year_interval < 0:
1174            return True
1175        if year_interval == 0 and month_interval in range(-11, 3):
1176            return True
1177        if year_interval == 1 and month_interval + 12 in (1, 2):
1178            return True
1179
1180    @classmethod
1181    def _parse_property_value(cls, property_name, driver_request, kit):
1182        test_args = copy.deepcopy(
1183            driver_request.config.get(ConfigConst.testargs, dict()))
1184        property_value = ""
1185        if ConfigConst.pass_through in test_args.keys():
1186            import json
1187            pt_dict = json.loads(test_args.get(ConfigConst.pass_through, ""))
1188            property_value = pt_dict.get(property_name, None)
1189        elif property_name in test_args.keys:
1190            property_value = test_args.get(property_name, None)
1191        return property_value if property_value else \
1192            kit.properties.get(property_name, None)
1193
1194    @classmethod
1195    def _calculate_device_options(cls, device_options, environment_config,
1196                                  options, test_source):
1197        # calculate difference
1198        diff_value = len(environment_config) - len(device_options)
1199        if device_options and diff_value == 0:
1200            return device_options
1201
1202        else:
1203            diff_value = diff_value if diff_value else 1
1204            if str(test_source.source_file).endswith(".bin"):
1205                device_option = DeviceSelectionOption(
1206                    options, DeviceLabelType.ipcamera, test_source)
1207            else:
1208                device_option = DeviceSelectionOption(
1209                    options, None, test_source)
1210
1211            device_option.source_file = \
1212                test_source.source_file or test_source.source_string
1213            device_option.required_manager = "device"
1214            device_options.extend([device_option] * diff_value)
1215            LOG.debug("Assign device options and it's length is %s"
1216                      % len(device_options))
1217        return device_options
1218
1219    @classmethod
1220    def update_test_type_in_source(cls, key, value):
1221        LOG.debug("update test type dict in source")
1222        TestDictSource.test_type[key] = value
1223
1224    @classmethod
1225    def update_ext_type_in_source(cls, key, value):
1226        LOG.debug("update ext type dict in source")
1227        TestDictSource.exe_type[key] = value
1228
1229    @classmethod
1230    def _clear_test_dict_source(cls):
1231        TestDictSource.exe_type.clear()
1232        TestDictSource.test_type.clear()
1233
1234    @classmethod
1235    def _pre_component_test(cls, config):
1236        if not config.kits:
1237            return
1238        cur_kit = None
1239        for kit in config.kits:
1240            if kit.__class__.__name__ == CKit.component:
1241                cur_kit = kit
1242                break
1243        if not cur_kit:
1244            return
1245        get_white_list = getattr(cur_kit, "get_white_list", None)
1246        if not callable(get_white_list):
1247            return
1248        subsystems, parts = get_white_list()
1249        if not subsystems and not parts:
1250            return
1251        setattr(config, ConfigConst.component_base_kit, cur_kit)
1252
1253    @classmethod
1254    def component_task_setup(cls, task, module_name):
1255        component_kit = task.config.get(ConfigConst.component_base_kit, None)
1256        if not component_kit:
1257            # only -p -s .you do not care about the components that can be
1258            # supported. you only want to run the use cases of the current
1259            # component
1260            return
1261        LOG.debug("Start component task setup")
1262        _component_mapper = task.config.get(ConfigConst.component_mapper)
1263        _subsystem, _part = _component_mapper.get(module_name)
1264
1265        is_hit = False
1266        # find in cache. if not find, update cache
1267        cache_subsystem, cache_part = component_kit.get_cache()
1268        if _subsystem in cache_subsystem or _part in cache_subsystem:
1269            is_hit = True
1270        if not is_hit:
1271            env_manager = EnvironmentManager()
1272            for _, manager in env_manager.managers.items():
1273                if getattr(manager, "devices_list", []):
1274                    for device in manager.devices_list:
1275                        component_kit.__setup__(device)
1276            cache_subsystem, cache_part = component_kit.get_cache()
1277            if _subsystem in cache_subsystem or _part in cache_subsystem:
1278                is_hit = True
1279        if not is_hit:
1280            LOG.warning("%s are skipped, no suitable component found. "
1281                        "Require subsystem=%s part=%s, no device match this"
1282                        % (module_name, _subsystem, _part))
1283